From dfe0c18be270572d9046560be4c88fb93403e4b5 Mon Sep 17 00:00:00 2001 From: mario Date: Thu, 8 May 2025 12:07:19 +0700 Subject: [PATCH] init: tested on hangtuah to cloud --- .gitignore | 2 + README.md | 134 ++++++++- config/settings.py | 44 +++ main.py | 552 ++++++++++++++++++++++++++++++++++++ requirements.txt | 6 + services/dicom_finder.py | 272 ++++++++++++++++++ services/dicom_retriever.py | 359 +++++++++++++++++++++++ services/dicom_sender.py | 358 +++++++++++++++++++++++ utils/cleanup.py | 73 +++++ utils/dicom_utils.py | 86 ++++++ utils/error_handler.py | 53 ++++ utils/logger.py | 54 ++++ 12 files changed, 1992 insertions(+), 1 deletion(-) create mode 100644 config/settings.py create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 services/dicom_finder.py create mode 100644 services/dicom_retriever.py create mode 100644 services/dicom_sender.py create mode 100644 utils/cleanup.py create mode 100644 utils/dicom_utils.py create mode 100644 utils/error_handler.py create mode 100644 utils/logger.py diff --git a/.gitignore b/.gitignore index 5d381cc..b874c9d 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,8 @@ __pycache__/ # C extensions *.so +#log +*.json # Distribution / packaging .Python diff --git a/README.md b/README.md index 037ec7b..10ee5d9 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,135 @@ # pydicom-migrasi-clarity -Untuk migrasi PACS Clarity ke ABPACS dengan cara: FindSCU by date, GetSCU for each StudyIUID found, StoreSCU to ABPACS, then send API to HIS2 to fill pacs_result_series \ No newline at end of file +## Tujuan Proyek +Aplikasi ini dibuat untuk melakukan migrasi data PACS dari sistem Clarity ke ABPACS dengan proses sebagai berikut: +1. Melakukan pencarian studi DICOM menggunakan FindSCU berdasarkan tanggal +2. Mengambil data DICOM lengkap menggunakan GetSCU untuk setiap StudyIUID yang ditemukan +3. Mengirim data DICOM ke ABPACS menggunakan StoreSCU +4. Mengirim data ke API HIS2 untuk mengisi pacs_result_series +5. Mencatat seluruh proses dalam log terstruktur + +## Alur Kerja Detail +Aplikasi ini menggunakan pendekatan berbasis Python murni dengan bantuan library pynetdicom. Berikut tahapan detail proses: + +1. **Pencarian (FindSCU)**: + - Mencari studi berdasarkan rentang tanggal (StudyDate) + - Mengekstrak metadata penting seperti StudyInstanceUID, AccessionNumber, PatientID + - Menyimpan hasil pencarian dalam format JSON + +2. **Pengambilan (GetSCU)**: + - Mengunduh seluruh data DICOM untuk setiap StudyInstanceUID yang ditemukan + - Menyimpan data DICOM dalam struktur direktori terorganisir + - Data disimpan sementara dan dihapus otomatis setelah pengiriman + +3. **Pengiriman (StoreSCU)**: + - Mengirim data DICOM yang diunduh ke server ABPACS + - Melacak status pengiriman dan menangani error atau kegagalan + - Menghapus data DICOM setelah pengiriman berhasil + +4. **Integrasi HIS**: + - Mengirim metadata DICOM ke API HIS2 + - Memastikan informasi studi dan series tersimpan di sistem HIS + - Menyimpan log hasil komunikasi dengan sistem HIS + +## Persyaratan +- Python 3.9 +- Pustaka Python utama: + - pydicom + - pynetdicom + - requests + - python-dateutil + - retry +- Akses jaringan ke server PACS sumber dan tujuan + +## Cara Mengkloning Repositori + +```bash +git clone https://devone.aplikasi.web.id/gitea/mario/pydicom-migrasi-clarity.git +cd pydicom-migrasi-clarity +``` + +## Instalasi dengan Virtual Environment + +### 1. Membuat Virtual Environment +```bash +# Untuk Linux +python3.9 -m venv venv + +# Untuk Windows +python -m venv venv +``` + +### 2. Mengaktifkan Virtual Environment +```bash +# Untuk Linux +source venv/bin/activate + +# Untuk windows +venv/Scripts/activate +``` + +### 3. Menginstall Dependensi +```bash +pip install -r requirements.txt +``` + +## Konfigurasi PACS + +**Sesuaikan `SOURCE_PACS` dan `DESTINATION_PACS` pada `config/settings.py`** + +## Cara Menjalankan Aplikasi + +### Contoh Penggunaan Umum +Untuk migrasi data harian baru: +```bash +python main.py process --start-date 20250507 --end-date 20250507 +``` + +### Perintah pendukung yang Tersedia + +Program ini mendukung beberapa mode operasi: + +1. **Process** - Menjalankan alur kerja lengkap (pencarian, pengambilan, pengiriman): + ```bash + python main.py process --start-date 20250501 --end-date 20250502 + ``` + +2. **Find-Studies** - Hanya mencari studi berdasarkan rentang tanggal: + ```bash + python main.py find-studies --start-date 20250501 --end-date 20250502 + ``` + +3. **Get-Study** - Mengambil studi tertentu berdasarkan StudyInstanceUID: + ```bash + python main.py get-study --study-uid 1.2.826.1.3680043.9.5282.150415.30338.202504010001 + ``` + +4. **Send-Study** - Mengirim studi tertentu ke PACS tujuan: + ```bash + python main.py send-study --study-uid 1.2.826.1.3680043.9.5282.150415.30338.202504010001 + ``` + +### Parameter Umum +* `--start-date`: Tanggal awal pencarian dalam format YYYYMMDD +* `--end-date`: Tanggal akhir pencarian dalam format YYYYMMDD +* `--study-uid`: StudyInstanceUID untuk operasi pada satu studi +* `--series-uid`: SeriesInstanceUID untuk operasi pada satu seri +* `--skip-existing`: Lewati studi yang sudah memiliki log (untuk melanjutkan proses yang terhenti) + + + +## Struktur Direktori +- `config/`: Berisi file konfigurasi aplikasi +- `logs/`: Menyimpan log operasi dan hasil pengiriman +- `output/`: Tempat menyimpan hasil DICOM dan file JSON +- `services/`: Modul-modul untuk operasi DICOM (finder, retriever, sender) +- `utils/`: Fungsi-fungsi pembantu dan utilitas + +## Troubleshooting + +Jika menemui masalah, silakan periksa file log di direktori `logs/` untuk informasi lebih detail. + +Untuk menonaktifkan virtual environment setelah selesai: +```bash +deactivate +``` \ No newline at end of file diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..a8ca53f --- /dev/null +++ b/config/settings.py @@ -0,0 +1,44 @@ +""" +Configuration settings for DICOM operations. +""" + +# DICOM Network Settings +SOURCE_AET = "PYNETDICOM" # Our AE Title +SOURCE_PORT = 8888 # Our port + +# Source PACS Configuration (where to query/get data from) +SOURCE_PACS = { + "host": "192.168.22.3", + "port": 11112, + "aet": "ABPACS" +} + +# Destination PACS Configuration (where to send data to) +DESTINATION_PACS = { + "host": "152.42.173.210", + "port": 11112, + "aet": "ABPACS" +} + +# Network operation timeouts (in seconds) +NETWORK_TIMEOUT = 30 +ASSOCIATION_TIMEOUT = 5 + +# Retry configuration +MAX_RETRIES = 2 +RETRY_DELAY = 5 # seconds + +# Logging Configuration +LOG_LEVEL = "INFO" +LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" +LOG_FILE = "logs/dicom_operations.log" +MAX_LOG_SIZE = 10 * 1024 * 1024 # 10 MB +BACKUP_COUNT = 5 + +# Output Configuration +JSON_OUTPUT_DIR = "output/json" +DICOM_STORE_DIR = "output/dicom" + +# HIS Configuration +HIS_HOST = "localhost:8787" +HIS_URL = "/result_series" \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..b4352f7 --- /dev/null +++ b/main.py @@ -0,0 +1,552 @@ +#!/usr/bin/env python3 +""" +Main entry point for DICOM data retrieval and routing automation. +""" +import os +import sys +import json +import argparse +import requests +from datetime import datetime +from config import settings +from utils.logger import main_logger as logger +from utils.dicom_utils import save_json_data, create_directory_if_not_exists +from utils.cleanup import register_exit_handlers, register_cleanup_dir +from services.dicom_finder import DicomFinder +from services.dicom_retriever import DicomRetriever +from services.dicom_sender import DicomSender + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser(description='DICOM Data Retrieval and Routing Automation') + + # Command options + subparsers = parser.add_subparsers(dest='command', help='Command to execute') + + # Find studies command + find_parser = subparsers.add_parser('find-studies', help='Find studies by date range') + find_parser.add_argument('--start-date', required=True, help='Start date in YYYYMMDD format') + find_parser.add_argument('--end-date', required=True, help='End date in YYYYMMDD format') + find_parser.add_argument('--output-file', help='JSON output file (default: studies_YYYYMMDD.json)') + + # Find series command + series_parser = subparsers.add_parser('find-series', help='Find series for a study') + series_parser.add_argument('--study-uid', required=True, help='Study Instance UID') + series_parser.add_argument('--output-file', help='JSON output file (default: series_STUDYUID.json)') + + # Find first instance command + instance_parser = subparsers.add_parser('find-instance', help='Find first instance for a series') + instance_parser.add_argument('--study-uid', required=True, help='Study Instance UID') + instance_parser.add_argument('--series-uid', required=True, help='Series Instance UID') + instance_parser.add_argument('--output-file', help='JSON output file (default: instance_SERIESUID.json)') + + # Get study command + get_study_parser = subparsers.add_parser('get-study', help='Retrieve a complete study') + get_study_parser.add_argument('--study-uid', required=True, help='Study Instance UID to retrieve') + + # Get series command + get_series_parser = subparsers.add_parser('get-series', help='Retrieve a specific series') + get_series_parser.add_argument('--study-uid', required=True, help='Study Instance UID') + get_series_parser.add_argument('--series-uid', required=True, help='Series Instance UID to retrieve') + + # Send study command + send_study_parser = subparsers.add_parser('send-study', help='Send a complete study to destination PACS') + send_study_parser.add_argument('--study-uid', required=True, help='Study Instance UID to send') + send_study_parser.add_argument('--study-path', help='Path to study directory (default: DICOM_STORE_DIR/study_uid)') + + # Send series command + send_series_parser = subparsers.add_parser('send-series', help='Send a specific series to destination PACS') + send_series_parser.add_argument('--study-uid', required=True, help='Study Instance UID') + send_series_parser.add_argument('--series-uid', required=True, help='Series Instance UID to send') + + # Send file command + send_file_parser = subparsers.add_parser('send-file', help='Send a single DICOM file to destination PACS') + send_file_parser.add_argument('--file-path', required=True, help='Path to DICOM file to send') + + # Process command - full workflow: find, get, and send DICOM data for a date range + process_parser = subparsers.add_parser('process', help='Process full workflow (find, get, send) for a date range') + process_parser.add_argument('--start-date', required=True, help='Start date in YYYYMMDD format') + process_parser.add_argument('--end-date', required=True, help='End date in YYYYMMDD format') + process_parser.add_argument('--log-dir', help='Directory to save JSON logs (default: JSON_OUTPUT_DIR)') + process_parser.add_argument('--skip-existing', action='store_true', help='Skip studies that already have JSON logs') + + return parser.parse_args() + +def find_studies(args): + """ + Find studies by date range and save results to JSON. + + Args: + args: Command line arguments + """ + start_date = args.start_date + end_date = args.end_date + + # Validate date format + try: + datetime.strptime(start_date, '%Y%m%d') + datetime.strptime(end_date, '%Y%m%d') + except ValueError: + logger.error("Invalid date format. Use YYYYMMDD format.") + sys.exit(1) + + finder = DicomFinder() + + try: + # Find studies + studies = finder.find_studies_by_date_range(start_date, end_date) + + # Convert to list of dictionaries for JSON serialization + study_data = [] + for study in studies: + study_dict = { + 'StudyInstanceUID': study.StudyInstanceUID, + 'StudyDate': study.StudyDate, + 'StudyTime': getattr(study, 'StudyTime', ''), + 'AccessionNumber': getattr(study, 'AccessionNumber', ''), + 'PatientID': getattr(study, 'PatientID', ''), # MedrecID + 'StudyDescription': getattr(study, 'StudyDescription', ''), + 'NumberOfStudyRelatedSeries': getattr(study, 'NumberOfStudyRelatedSeries', '') + } + study_data.append(study_dict) + + # Save to JSON + output_file = args.output_file or f"studies_{start_date}_to_{end_date}.json" + save_json_data(study_data, output_file, settings.JSON_OUTPUT_DIR) + + logger.info(f"Found {len(study_data)} studies from {start_date} to {end_date}") + logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}") + + except Exception as e: + logger.error(f"Error finding studies: {str(e)}") + sys.exit(1) + +def find_series(args): + """ + Find series for a study and save results to JSON. + + Args: + args: Command line arguments + """ + study_uid = args.study_uid + + finder = DicomFinder() + + try: + # Find series + series_list = finder.find_series_for_study(study_uid) + + # Convert to list of dictionaries for JSON serialization + series_data = [] + for series in series_list: + series_dict = { + 'StudyInstanceUID': series.StudyInstanceUID, + 'SeriesInstanceUID': series.SeriesInstanceUID, + 'SeriesNumber': getattr(series, 'SeriesNumber', ''), + 'SeriesDescription': getattr(series, 'SeriesDescription', ''), + 'Modality': getattr(series, 'Modality', ''), + 'NumberOfSeriesRelatedInstances': getattr(series, 'NumberOfSeriesRelatedInstances', '') + } + series_data.append(series_dict) + + # Save to JSON + output_file = args.output_file or f"series_{study_uid}.json" + save_json_data(series_data, output_file, settings.JSON_OUTPUT_DIR) + + logger.info(f"Found {len(series_data)} series for study {study_uid}") + logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}") + + except Exception as e: + logger.error(f"Error finding series: {str(e)}") + sys.exit(1) + +def find_instance(args): + """ + Find first instance for a series and save results to JSON. + + Args: + args: Command line arguments + """ + study_uid = args.study_uid + series_uid = args.series_uid + + finder = DicomFinder() + + try: + # Find first instance + instance = finder.find_first_instance_for_series(study_uid, series_uid) + + if instance: + # Convert to dictionary for JSON serialization + instance_dict = { + 'StudyInstanceUID': instance.StudyInstanceUID, + 'SeriesInstanceUID': instance.SeriesInstanceUID, + 'SOPInstanceUID': instance.SOPInstanceUID, + 'SOPClassUID': getattr(instance, 'SOPClassUID', ''), + 'InstanceNumber': getattr(instance, 'InstanceNumber', '') + } + + # Save to JSON + output_file = args.output_file or f"instance_{series_uid}.json" + save_json_data(instance_dict, output_file, settings.JSON_OUTPUT_DIR) + + logger.info(f"Found first instance for series {series_uid}") + logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}") + else: + logger.warning(f"No instances found for series {series_uid}") + + except Exception as e: + logger.error(f"Error finding instance: {str(e)}") + sys.exit(1) + +def get_study(args): + """ + Retrieve a complete study using C-GET. + + Args: + args: Command line arguments + """ + study_uid = args.study_uid + + logger.info(f"Starting retrieval of study: {study_uid}") + + retriever = DicomRetriever() + + try: + # Retrieve the study + result = retriever.retrieve_study(study_uid) + + if result['success']: + logger.info(f"Successfully retrieved study {study_uid}") + logger.info(f"Retrieved {result['successful_instances']} of {result['total_instances']} instances") + logger.info(f"DICOM files stored in: {result['study_dir']}") + else: + logger.error(f"Failed to retrieve study {study_uid}: {result['status']}") + if result['error']: + logger.error(f"Error details: {result['error']}") + sys.exit(1) + + except Exception as e: + logger.error(f"Error retrieving study: {str(e)}") + sys.exit(1) + +def get_series(args): + """ + Retrieve a specific series using C-GET. + + Args: + args: Command line arguments + """ + study_uid = args.study_uid + series_uid = args.series_uid + + logger.info(f"Starting retrieval of series: {series_uid} from study: {study_uid}") + + retriever = DicomRetriever() + + try: + # Retrieve the series + result = retriever.retrieve_series(study_uid, series_uid) + + if result['success']: + logger.info(f"Successfully retrieved series {series_uid}") + logger.info(f"Retrieved {result['successful_instances']} of {result['total_instances']} instances") + logger.info(f"DICOM files stored in: {result['series_dir']}") + else: + logger.error(f"Failed to retrieve series {series_uid}: {result['status']}") + if result['error']: + logger.error(f"Error details: {result['error']}") + sys.exit(1) + + except Exception as e: + logger.error(f"Error retrieving series: {str(e)}") + sys.exit(1) + +def send_study(args): + """ + Send a complete study to destination PACS using C-STORE. + + Args: + args: Command line arguments + """ + study_uid = args.study_uid + study_path = args.study_path or os.path.join(settings.DICOM_STORE_DIR, study_uid) + + logger.info(f"Starting sending of study: {study_uid} from {study_path}") + + sender = DicomSender() + + try: + # Send the study + result = sender.send_study(study_path) + + if result['success']: + logger.info(f"Successfully sent study {study_uid}") + logger.info(f"Sent {result['successful_sends']} of {result['total_files']} files") + if result['failed_sends'] > 0: + logger.warning(f"Failed to send {result['failed_sends']} files") + else: + logger.error(f"Failed to send study {study_uid}") + if result.get('error'): + logger.error(f"Error details: {result['error']}") + sys.exit(1) + + except Exception as e: + logger.error(f"Error sending study: {str(e)}") + sys.exit(1) + +def send_series(args): + """ + Send a specific series to destination PACS using C-STORE. + + Args: + args: Command line arguments + """ + study_uid = args.study_uid + series_uid = args.series_uid + + logger.info(f"Starting sending of series: {series_uid} from study: {study_uid}") + + sender = DicomSender() + + try: + # Send the series + result = sender.send_series(study_uid, series_uid) + + if result['success']: + logger.info(f"Successfully sent series {series_uid}") + logger.info(f"Sent {result['successful_sends']} of {result['total_files']} files") + if result['failed_sends'] > 0: + logger.warning(f"Failed to send {result['failed_sends']} files") + else: + logger.error(f"Failed to send series {series_uid}") + if result.get('error'): + logger.error(f"Error details: {result['error']}") + sys.exit(1) + + except Exception as e: + logger.error(f"Error sending series: {str(e)}") + sys.exit(1) + +def send_file(args): + """ + Send a single DICOM file to destination PACS using C-STORE. + + Args: + args: Command line arguments + """ + file_path = args.file_path + + logger.info(f"Starting sending of DICOM file: {file_path}") + + sender = DicomSender() + + try: + # Send the file + result = sender.send_file(file_path) + + if result['success']: + logger.info(f"Successfully sent file {file_path}") + else: + logger.error(f"Failed to send file {file_path}: {result['status']}") + if result['error']: + logger.error(f"Error details: {result['error']}") + sys.exit(1) + + except Exception as e: + logger.error(f"Error sending file: {str(e)}") + sys.exit(1) + +def process_workflow(args): + """ + Process the full workflow: find studies by date range, retrieve them, and send to destination PACS. + Also creates detailed JSON logs for each series. + + Args: + args: Command line arguments + """ + start_date = args.start_date + end_date = args.end_date + log_dir = args.log_dir or settings.JSON_OUTPUT_DIR + skip_existing = args.skip_existing + + # Validate date format + try: + datetime.strptime(start_date, '%Y%m%d') + datetime.strptime(end_date, '%Y%m%d') + except ValueError: + logger.error("Invalid date format. Use YYYYMMDD format.") + sys.exit(1) + + logger.info(f"Starting full workflow process for studies from {start_date} to {end_date}") + + # Create services + finder = DicomFinder() + retriever = DicomRetriever() + sender = DicomSender() + + # Create directory for logs + create_directory_if_not_exists(log_dir) + + # STEP 1: Find studies in the date range + try: + studies = finder.find_studies_by_date_range(start_date, end_date) + logger.info(f"Found {len(studies)} studies between {start_date} and {end_date}") + + # Save studies summary + studies_summary = [{ + 'StudyInstanceUID': study.StudyInstanceUID, + 'StudyDate': getattr(study, 'StudyDate', ''), + 'StudyTime': getattr(study, 'StudyTime', ''), + 'AccessionNumber': getattr(study, 'AccessionNumber', ''), + 'PatientID': getattr(study, 'PatientID', ''), + 'StudyDescription': getattr(study, 'StudyDescription', '') + } for study in studies] + + save_json_data(studies_summary, f"studies_{start_date}_to_{end_date}.json", log_dir) + + # STEP 2, 3, 4: For each study, retrieve, send, and log details + his_log_filename = f"sendtohis_{start_date}_{end_date}.json" + his_log_path = os.path.join("logs", his_log_filename) + his_log = [] + his_fail_log_filename = f"fail_sendtohis_{start_date}_{end_date}.json" + his_fail_log_path = os.path.join("logs", his_fail_log_filename) + his_fail_log = [] + + for i, study in enumerate(studies): + accession_number = study.AccessionNumber + study_uid = study.StudyInstanceUID + logger.info(f"Processing study {i+1}/{len(studies)}: {accession_number} with Study_IUID ({study_uid})") + + # Check if we should skip this study + log_file_path = os.path.join(log_dir, f"{study_uid}.json") + if skip_existing and os.path.exists(log_file_path): + logger.info(f"Skipping study {study_uid} - log file already exists") + continue + + # STEP 2: Retrieve the study + try: + retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number) + if not retrieve_result['success']: + logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}") + continue + + logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}") + + # STEP 3: Send the study to destination PACS + send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid)) + if not send_result['success']: + logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}") + + logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS") + + # STEP 4: Find all series and create detailed logs + series_list = finder.find_series_for_study(study_uid, accession_number=accession_number) + + study_date = getattr(study, 'StudyDate', '') + study_time = getattr(study, 'StudyTime', '000000') # Default to midnight if StudyTime is not available + study_datetime = f"{study_date}{study_time}" + + study_log = { + 'Study_IUID': study_uid, + 'AccessionNumber': getattr(study, 'AccessionNumber', ''), + 'PatientID': getattr(study, 'PatientID', ''), # MedrecID + 'StudyDescription': getattr(study, 'StudyDescription', ''), + 'StudyDateTime': study_datetime, + 'CstoreSuccess': send_result['success'], + 'Series': [] + } + + for series in series_list: + series_uid = series.SeriesInstanceUID + + # Get first instance for the series + instance = finder.find_first_instance_for_series(study_uid, series_uid) + + series_info = { + 'Series_IUID': getattr(series, 'SeriesInstanceUID', ''), + 'SeriesNumber': getattr(series, 'SeriesNumber', ''), + 'SeriesDescription': getattr(series, 'SeriesDescription', ''), + 'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''), + 'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else '' + } + + study_log['Series'].append(series_info) + + # Save the detailed log + # save_json_data(study_log, f"{study_uid}.json", log_dir) + # logger.info(f"Created detailed log for study {study_uid}") + + # STEP 5: Send study_log to HIS API + his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}" + try: + response = requests.post(his_url, json=study_log) + if response.status_code == 200 and response.json().get('status') == "OK": + his_log.append(study_log) + logger.info(f"Successfully sent JSON {accession_number} to HIS API") + else: + his_fail_log.append(study_log) + logger.error(f"Failed to send JSON for {accession_number} to HIS API: {response.msg}. Study_IUID: {study_uid}") + except Exception as e: + logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}") + + except Exception as e: + logger.error(f"Error processing study {accession_number}: {str(e)}") + continue + + # Save HIS log + if his_log: + save_json_data(his_log, his_log_filename, "logs") + if his_fail_log: + save_json_data(his_fail_log, his_fail_log_filename, "logs") + + logger.info(f"Completed processing {len(studies)} studies from {start_date} to {end_date}") + + except Exception as e: + logger.error(f"Error during workflow processing: {str(e)}") + sys.exit(1) + +def main(): + """Main function.""" + # Register cleanup handlers for graceful exit + register_exit_handlers() + + # Create necessary directories + create_directory_if_not_exists(settings.JSON_OUTPUT_DIR) + create_directory_if_not_exists(settings.DICOM_STORE_DIR) + + # Register the main DICOM store directory for cleanup + if os.path.exists(settings.DICOM_STORE_DIR): + # Only register top-level directories in the DICOM store directory + for item in os.listdir(settings.DICOM_STORE_DIR): + item_path = os.path.join(settings.DICOM_STORE_DIR, item) + if os.path.isdir(item_path): + register_cleanup_dir(item_path) + + # Parse arguments + args = parse_args() + + if args.command == 'find-studies': + find_studies(args) + elif args.command == 'find-series': + find_series(args) + elif args.command == 'find-instance': + find_instance(args) + elif args.command == 'get-study': + get_study(args) + elif args.command == 'get-series': + get_series(args) + elif args.command == 'send-study': + send_study(args) + elif args.command == 'send-series': + send_series(args) + elif args.command == 'send-file': + send_file(args) + elif args.command == 'process': + process_workflow(args) + else: + logger.error("No command specified. Use --help for options.") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5bdbb23 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +pydicom>=2.3.0 +pynetdicom>=2.0.0 +python-dateutil>=2.8.2 +retry==0.9.2 +tqdm==4.65.0 +requests>=2.28.0 diff --git a/services/dicom_finder.py b/services/dicom_finder.py new file mode 100644 index 0000000..f046f74 --- /dev/null +++ b/services/dicom_finder.py @@ -0,0 +1,272 @@ +""" +DICOM finder service - Implements findscu functionality using pynetdicom. +""" +import os +from datetime import datetime +from pydicom.dataset import Dataset +from pynetdicom import AE, evt, build_role, debug_logger +from pynetdicom.sop_class import ( + PatientRootQueryRetrieveInformationModelFind, + StudyRootQueryRetrieveInformationModelFind, + PatientStudyOnlyQueryRetrieveInformationModelFind +) +from config import settings +from utils.logger import main_logger as logger +from utils.error_handler import DicomQueryError, dicom_retry +from utils.dicom_utils import parse_dicom_date + +# Set debug level +# debug_logger() # Uncomment for detailed debug logs + +class DicomFinder: + """ + Class to perform DICOM C-FIND operations at different levels (STUDY, SERIES, IMAGE). + """ + + def __init__(self, pacs_config=None): + """ + Initialize DicomFinder with PACS settings. + + Args: + pacs_config (dict, optional): PACS configuration dict containing host, port, aet + """ + self.pacs_config = pacs_config or settings.SOURCE_PACS + self.ae = AE(ae_title=settings.SOURCE_AET) + + # Add the supported presentation contexts (Query/Retrieve SOP classes) + self.ae.add_requested_context(StudyRootQueryRetrieveInformationModelFind) + self.ae.add_requested_context(PatientRootQueryRetrieveInformationModelFind) + self.ae.add_requested_context(PatientStudyOnlyQueryRetrieveInformationModelFind) + + logger.info(f"DicomFinder initialized with PACS: {self.pacs_config['aet']}@{self.pacs_config['host']}:{self.pacs_config['port']}") + + @dicom_retry(exception_types=(DicomQueryError, ConnectionError)) + def find_studies_by_date_range(self, start_date, end_date, additional_filters=None): + """ + Find studies within a date range. + + Args: + start_date (str): Start date in YYYYMMDD format + end_date (str): End date in YYYYMMDD format + additional_filters (dict, optional): Additional DICOM attributes to filter by + + Returns: + list: List of study datasets + """ + logger.info(f"Finding studies from {start_date} to {end_date}") + + # Create query dataset + ds = Dataset() + ds.QueryRetrieveLevel = 'STUDY' + + # Required fields (minimal set) + ds.StudyInstanceUID = '' + ds.StudyDate = '' + + # Additional fields we want to retrieve + ds.AccessionNumber = '' + ds.PatientID = '' # MedrecID + ds.StudyDescription = '' + ds.StudyTime = '' + ds.NumberOfStudyRelatedSeries = '' + + # Set date range + ds.StudyDate = f"{start_date}-{end_date}" + + # Add any additional filters + if additional_filters: + for key, value in additional_filters.items(): + setattr(ds, key, value) + + studies = [] + + # Create association + assoc = self.ae.associate( + self.pacs_config['host'], + self.pacs_config['port'], + ae_title=self.pacs_config['aet'] + ) + + if assoc.is_established: + try: + logger.debug("Association established, sending C-FIND request") + + # Send C-FIND request + responses = assoc.send_c_find( + ds, + StudyRootQueryRetrieveInformationModelFind + ) + + # Process responses + for (status, dataset) in responses: + if status and status.Status == 0xFF00: # Pending + if dataset: + studies.append(dataset) + logger.debug(f"Found study: {dataset.StudyInstanceUID}") + elif status and status.Status != 0x0000: # Not success + logger.error(f"C-FIND error: {status}") + + logger.info(f"Found {len(studies)} studies in date range {start_date}-{end_date}") + + except Exception as e: + logger.error(f"Error during C-FIND: {str(e)}") + raise DicomQueryError(f"C-FIND operation failed: {str(e)}") + finally: + # Release the association + assoc.release() + logger.debug("Association released") + else: + error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}" + logger.error(error_msg) + raise DicomQueryError(error_msg) + + return studies + + @dicom_retry(exception_types=(DicomQueryError, ConnectionError)) + def find_series_for_study(self, study_instance_uid, accession_number=None): + """ + Find all series for a given study. + + Args: + study_instance_uid (str): StudyInstanceUID + + Returns: + list: List of series datasets + """ + logger.info(f"Finding series for {accession_number} Study_IUID: {study_instance_uid}") + + # Create query dataset + ds = Dataset() + ds.QueryRetrieveLevel = 'SERIES' + + # Set the StudyInstanceUID filter + ds.StudyInstanceUID = study_instance_uid + + # Required fields + ds.SeriesInstanceUID = '' + + # Additional fields to retrieve + ds.SeriesNumber = '' + ds.SeriesDescription = '' + ds.Modality = '' + ds.NumberOfSeriesRelatedInstances = '' + + series_list = [] + + # Create association + assoc = self.ae.associate( + self.pacs_config['host'], + self.pacs_config['port'], + ae_title=self.pacs_config['aet'] + ) + + if assoc.is_established: + try: + logger.debug(f"Association established, sending SERIES-level C-FIND for study {study_instance_uid}") + + # Send C-FIND request + responses = assoc.send_c_find( + ds, + StudyRootQueryRetrieveInformationModelFind + ) + + # Process responses + for (status, dataset) in responses: + if status and status.Status == 0xFF00: # Pending + if dataset: + series_list.append(dataset) + logger.debug(f"Found series: {dataset.SeriesInstanceUID}") + elif status and status.Status != 0x0000: # Not success + logger.error(f"C-FIND error: {status}") + + logger.info(f"Found {len(series_list)} series for {accession_number} with Study_IUID {study_instance_uid}") + + except Exception as e: + logger.error(f"Error during SERIES C-FIND: {str(e)}") + raise DicomQueryError(f"SERIES C-FIND operation failed: {str(e)}") + finally: + # Release the association + assoc.release() + logger.debug("Association released") + else: + error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}" + logger.error(error_msg) + raise DicomQueryError(error_msg) + + return series_list + + @dicom_retry(exception_types=(DicomQueryError, ConnectionError)) + def find_first_instance_for_series(self, study_instance_uid, series_instance_uid): + """ + Find the first instance (SOP) for a given series. + + Args: + study_instance_uid (str): StudyInstanceUID + series_instance_uid (str): SeriesInstanceUID + + Returns: + Dataset: Dataset of the first instance found or None + """ + logger.info(f"Finding first instance for series: {series_instance_uid}") + + # Create query dataset + ds = Dataset() + ds.QueryRetrieveLevel = 'IMAGE' + + # Set the StudyInstanceUID and SeriesInstanceUID filters + ds.StudyInstanceUID = study_instance_uid + ds.SeriesInstanceUID = series_instance_uid + + # Required fields + ds.SOPInstanceUID = '' + ds.SOPClassUID = '' + ds.InstanceNumber = '' + + # Create association + assoc = self.ae.associate( + self.pacs_config['host'], + self.pacs_config['port'], + ae_title=self.pacs_config['aet'] + ) + + first_instance = None + + if assoc.is_established: + try: + logger.debug(f"Association established, sending IMAGE-level C-FIND for series {series_instance_uid}") + + # Send C-FIND request + responses = assoc.send_c_find( + ds, + StudyRootQueryRetrieveInformationModelFind + ) + + # Process responses - just get the first one + for (status, dataset) in responses: + if status and status.Status == 0xFF00: # Pending + if dataset: + # Found one instance, break + first_instance = dataset + logger.debug(f"Found first instance: {dataset.SOPInstanceUID}") + break + elif status and status.Status != 0x0000: # Not success + logger.error(f"C-FIND error: {status}") + + if first_instance: + logger.info(f"Found first instance {first_instance.SOPInstanceUID} for series {series_instance_uid}") + else: + logger.warning(f"No instances found for series {series_instance_uid}") + + except Exception as e: + logger.error(f"Error during IMAGE C-FIND: {str(e)}") + raise DicomQueryError(f"IMAGE C-FIND operation failed: {str(e)}") + finally: + # Release the association + assoc.release() + logger.debug("Association released") + else: + error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}" + logger.error(error_msg) + raise DicomQueryError(error_msg) + + return first_instance \ No newline at end of file diff --git a/services/dicom_retriever.py b/services/dicom_retriever.py new file mode 100644 index 0000000..201ac02 --- /dev/null +++ b/services/dicom_retriever.py @@ -0,0 +1,359 @@ +""" +DICOM retriever service - Implements getscu functionality using pynetdicom. +""" +import os +import tempfile +from pydicom.dataset import Dataset +from pynetdicom import AE, evt, build_role, StoragePresentationContexts +from pynetdicom.sop_class import ( + StudyRootQueryRetrieveInformationModelGet, + PatientRootQueryRetrieveInformationModelGet +) +from config import settings +from utils.logger import main_logger as logger +from utils.error_handler import DicomRetrieveError, dicom_retry +from utils.dicom_utils import create_directory_if_not_exists +from utils.cleanup import register_cleanup_dir + +class DicomRetriever: + """ + Class to perform DICOM C-GET operations to retrieve studies. + """ + + def __init__(self, pacs_config=None, store_dir=None): + """ + Initialize DicomRetriever with PACS settings. + + Args: + pacs_config (dict, optional): PACS configuration dict containing host, port, aet + store_dir (str, optional): Directory to store retrieved DICOM files + """ + self.pacs_config = pacs_config or settings.SOURCE_PACS + self.store_dir = store_dir or settings.DICOM_STORE_DIR + self.ae = AE(ae_title=settings.SOURCE_AET) + + # Add the Query/Retrieve SOP classes + self.ae.add_requested_context(StudyRootQueryRetrieveInformationModelGet) + self.ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet) + + # Add storage presentation contexts (needed for receiving the images) + # for context in StoragePresentationContexts: + # self.ae.add_requested_context(context.abstract_syntax) + storage_uids = [ + '1.2.840.10008.5.1.4.1.1.1', # CR Storage + '1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage + '1.2.840.10008.5.1.4.1.1.2', # CT Image Storage + '1.2.840.10008.5.1.4.1.1.4', # MR Image Storage + '1.2.840.10008.5.1.4.1.1.7', # Secondary Capture Image Storage + '1.2.840.10008.5.1.4.1.1.6.1', # Ultrasound Image Storage + '1.2.840.10008.5.1.4.1.1.128', # PET Image Storage + '1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine Image Storage + '1.2.840.10008.5.1.4.1.1.9.1.1', # 12-lead ECG Waveform Storage + '1.2.840.10008.5.1.4.1.1.9.1.2', # General ECG Waveform Storage + ] + self.ext_neg = [] + for uid in storage_uids: + self.ae.add_requested_context(uid) + role = build_role(uid, scp_role=True) + self.ext_neg.append(role) + + # Create storage directory if it doesn't exist + create_directory_if_not_exists(self.store_dir) + + logger.info(f"DicomRetriever initialized with PACS: {self.pacs_config['aet']}@{self.pacs_config['host']}:{self.pacs_config['port']}") + logger.info(f"DICOM files will be stored in: {self.store_dir}") + + def _handle_store(self, event): + """ + Handle C-STORE operations during a C-GET association. + + Args: + event: DICOM C-STORE event + + Returns: + int: Status code (0 = Success) + """ + dataset = event.dataset + + # Get the study, series, and instance UIDs + study_uid = dataset.StudyInstanceUID + series_uid = dataset.SeriesInstanceUID + instance_uid = dataset.SOPInstanceUID + + # Create nested directory structure: StudyInstanceUID/SeriesInstanceUID/ + study_dir = os.path.join(self.store_dir, study_uid) + series_dir = os.path.join(study_dir, series_uid) + create_directory_if_not_exists(series_dir) + + # Save the dataset to file + filename = f"{instance_uid}.dcm" + file_path = os.path.join(series_dir, filename) + + try: + dataset.save_as(file_path, write_like_original=False) + logger.debug(f"Stored instance {instance_uid} to {file_path}") + return 0x0000 # Success status + except Exception as e: + logger.error(f"Error storing instance {instance_uid}: {str(e)}") + return 0xC001 # Failed - Unable to store + + @dicom_retry(exception_types=(DicomRetrieveError, ConnectionError)) + def retrieve_study(self, study_instance_uid, accession_number=None): + """ + Retrieve a complete study using C-GET. + + Args: + study_instance_uid (str): StudyInstanceUID to retrieve + accession_number (str, optional): AccessionNumber to filter the study + + Returns: + dict: Summary of retrieved data with counts and status + """ + logger.info(f"Retrieving study: {accession_number} with Study_IUID {study_instance_uid}") + + # Create study-specific directory + study_dir = os.path.join(self.store_dir, study_instance_uid) + create_directory_if_not_exists(study_dir) + + # Register this directory for cleanup on exit + register_cleanup_dir(study_dir) + + # Create query dataset + ds = Dataset() + ds.QueryRetrieveLevel = 'STUDY' + ds.StudyInstanceUID = study_instance_uid + + # Create association + # Bind the evt.EVT_C_STORE handler to our _handle_store function + handlers = [(evt.EVT_C_STORE, self._handle_store)] + + assoc = self.ae.associate( + self.pacs_config['host'], + self.pacs_config['port'], + ae_title=self.pacs_config['aet'], + evt_handlers=handlers, + ext_neg=self.ext_neg, # No extended negotiation + ) + + result = { + 'success': False, + 'study_uid': study_instance_uid, + 'total_instances': 0, + 'successful_instances': 0, + 'failed_instances': 0, + 'status': '', + 'error': '', + 'study_dir': study_dir + } + + if assoc.is_established: + try: + logger.debug(f"Association established, sending C-GET request for study {study_instance_uid}") + + # Send C-GET request + responses = assoc.send_c_get( + ds, + StudyRootQueryRetrieveInformationModelGet ) + + # Track progress + for (status, identifier) in responses: + if status: + # Update status information + result['total_instances'] = getattr(status, 'NumberOfRemainingSuboperations', 0) + \ + getattr(status, 'NumberOfCompletedSuboperations', 0) + \ + getattr(status, 'NumberOfFailedSuboperations', 0) + \ + getattr(status, 'NumberOfWarningSuboperations', 0) + + result['successful_instances'] = getattr(status, 'NumberOfCompletedSuboperations', 0) + result['failed_instances'] = getattr(status, 'NumberOfFailedSuboperations', 0) + + # Log progress for large retrievals + if status.Status == 0xFF00: # Pending + if hasattr(status, 'NumberOfRemainingSuboperations'): + logger.debug(f"C-GET progress: {result['successful_instances']}/{result['total_instances']} instances received") + + # Check if operation was successful + if status.Status == 0x0000: # Success + result['success'] = True + result['status'] = 'Success' + elif status.Status == 0xB000: # Warning + result['success'] = True + result['status'] = 'Warning - Suboperations Complete with Failures' + else: + result['status'] = f"Error - Status code: {status.Status:04x}" + + logger.info(f"C-GET completed: {result['successful_instances']}/{result['total_instances']} instances retrieved") + + except Exception as e: + error_msg = f"Error during C-GET: {str(e)}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomRetrieveError(error_msg) + finally: + # Release the association + assoc.release() + logger.debug("Association released") + else: + error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomRetrieveError(error_msg) + + return result + + @dicom_retry(exception_types=(DicomRetrieveError, ConnectionError)) + def retrieve_series(self, study_instance_uid, series_instance_uid): + """ + Retrieve a specific series using C-GET. + + Args: + study_instance_uid (str): StudyInstanceUID + series_instance_uid (str): SeriesInstanceUID to retrieve + + Returns: + dict: Summary of retrieved data with counts and status + """ + logger.info(f"Retrieving series: {series_instance_uid} from study: {study_instance_uid}") + + # Create series-specific directory + study_dir = os.path.join(self.store_dir, study_instance_uid) + series_dir = os.path.join(study_dir, series_instance_uid) + create_directory_if_not_exists(series_dir) + + # Register directories for cleanup on exit + register_cleanup_dir(study_dir) + register_cleanup_dir(series_dir) + + # Create query dataset + ds = Dataset() + ds.QueryRetrieveLevel = 'SERIES' + ds.StudyInstanceUID = study_instance_uid + ds.SeriesInstanceUID = series_instance_uid + + # Create association + # Bind the evt.EVT_C_STORE handler to our _handle_store function + handlers = [(evt.EVT_C_STORE, self._handle_store)] + + assoc = self.ae.associate( + self.pacs_config['host'], + self.pacs_config['port'], + ae_title=self.pacs_config['aet'], + evt_handlers=handlers, + ext_neg=self.ext_neg, # No extended negotiation + ) + + result = { + 'success': False, + 'study_uid': study_instance_uid, + 'series_uid': series_instance_uid, + 'total_instances': 0, + 'successful_instances': 0, + 'failed_instances': 0, + 'status': '', + 'error': '', + 'series_dir': series_dir + } + + if assoc.is_established: + try: + logger.debug(f"Association established, sending C-GET request for series {series_instance_uid}") + + # Send C-GET request + responses = assoc.send_c_get( + ds, + StudyRootQueryRetrieveInformationModelGet ) + + # Track progress + for (status, identifier) in responses: + if status: + # Update status information + result['total_instances'] = getattr(status, 'NumberOfRemainingSuboperations', 0) + \ + getattr(status, 'NumberOfCompletedSuboperations', 0) + \ + getattr(status, 'NumberOfFailedSuboperations', 0) + \ + getattr(status, 'NumberOfWarningSuboperations', 0) + + result['successful_instances'] = getattr(status, 'NumberOfCompletedSuboperations', 0) + result['failed_instances'] = getattr(status, 'NumberOfFailedSuboperations', 0) + + # Check if operation was successful + if status.Status == 0x0000: # Success + result['success'] = True + result['status'] = 'Success' + elif status.Status == 0xB000: # Warning + result['success'] = True + result['status'] = 'Warning - Suboperations Complete with Failures' + else: + result['status'] = f"Error - Status code: {status.Status:04x}" + + logger.info(f"C-GET completed: {result['successful_instances']}/{result['total_instances']} instances retrieved") + + except Exception as e: + error_msg = f"Error during C-GET: {str(e)}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomRetrieveError(error_msg) + finally: + # Release the association + assoc.release() + logger.debug("Association released") + else: + error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomRetrieveError(error_msg) + + return result + + def get_retrieved_file_count(self, study_uid, series_uid=None): + """ + Count the number of files retrieved for a study or series. + + Args: + study_uid (str): StudyInstanceUID + series_uid (str, optional): SeriesInstanceUID. If None, count all files in the study. + + Returns: + int: Number of DICOM files found + """ + study_dir = os.path.join(self.store_dir, study_uid) + + if not os.path.exists(study_dir): + logger.warning(f"Study directory does not exist: {study_dir}") + return 0 + + file_count = 0 + + if series_uid: + # Count files in a specific series + series_dir = os.path.join(study_dir, series_uid) + if os.path.exists(series_dir): + # Count only .dcm files + file_count = len([f for f in os.listdir(series_dir) if f.endswith('.dcm')]) + else: + # Count files in all series of the study + for series_name in os.listdir(study_dir): + series_dir = os.path.join(study_dir, series_name) + if os.path.isdir(series_dir): + file_count += len([f for f in os.listdir(series_dir) if f.endswith('.dcm')]) + + return file_count + + def is_study_complete(self, study_uid, expected_instance_count=None): + """ + Check if a study has been completely retrieved. + + Args: + study_uid (str): StudyInstanceUID + expected_instance_count (int, optional): Expected number of instances. + + Returns: + bool: True if study is complete, False otherwise + """ + if expected_instance_count is None: + logger.warning("No expected instance count provided, can't determine if study is complete") + return False + + actual_count = self.get_retrieved_file_count(study_uid) + + logger.info(f"Study {study_uid} completeness: {actual_count}/{expected_instance_count} instances") + return actual_count >= expected_instance_count \ No newline at end of file diff --git a/services/dicom_sender.py b/services/dicom_sender.py new file mode 100644 index 0000000..f152c73 --- /dev/null +++ b/services/dicom_sender.py @@ -0,0 +1,358 @@ +""" +DICOM sender service - Implements storescu functionality using pynetdicom. +""" +import os +import glob +import pydicom +import shutil +from pydicom.dataset import Dataset +from pynetdicom import AE, StoragePresentationContexts, evt +from config import settings +from utils.logger import main_logger as logger +from utils.error_handler import DicomStoreError, dicom_retry +from utils.dicom_utils import create_directory_if_not_exists + +class DicomSender: + """ + Class to perform DICOM C-STORE operations to send DICOM studies to a destination PACS. + """ + + def __init__(self, pacs_config=None): + """ + Initialize DicomSender with PACS settings. + + Args: + pacs_config (dict, optional): Destination PACS configuration dict containing host, port, aet + """ + self.pacs_config = pacs_config or settings.DESTINATION_PACS + self.ae = AE(ae_title=settings.SOURCE_AET) + + # Add storage presentation contexts (all standard transfer syntaxes for each SOP class) + for context in StoragePresentationContexts: + self.ae.add_requested_context(context.abstract_syntax) + + logger.info(f"DicomSender initialized with destination PACS: {self.pacs_config['aet']}@{self.pacs_config['host']}:{self.pacs_config['port']}") + + def _remove_dicom_files(self, directory): + """ + Remove all DICOM files in the specified directory. + + Args: + directory (str): Path to the directory to remove files from. + """ + try: + if os.path.exists(directory): + shutil.rmtree(directory) + logger.info(f"Removed DICOM files from directory: {directory}") + except Exception as e: + logger.error(f"Failed to remove DICOM files from {directory}: {str(e)}") + + @dicom_retry(exception_types=(DicomStoreError, ConnectionError)) + def send_study(self, study_path): + """ + Send all DICOM files in a study directory to the destination PACS. + + Args: + study_path (str): Path to study directory containing series subdirectories with DICOM files + + Returns: + dict: Summary of store operations with counts and status + """ + logger.info(f"Sending all DICOM files from study: {study_path}") + + # Check if study directory exists + if not os.path.exists(study_path) or not os.path.isdir(study_path): + error_msg = f"Study directory does not exist: {study_path}" + logger.error(error_msg) + raise DicomStoreError(error_msg) + + # Find all DICOM files in the study directory (recursively) + dicom_files = [] + for root, _, files in os.walk(study_path): + for file in files: + if file.endswith('.dcm'): + dicom_files.append(os.path.join(root, file)) + + if not dicom_files: + logger.warning(f"No DICOM files found in study directory: {study_path}") + return { + 'success': False, + 'total_files': 0, + 'successful_sends': 0, + 'failed_sends': 0, + 'error': 'No DICOM files found' + } + + logger.info(f"Found {len(dicom_files)} DICOM files to send") + + # Create association + assoc = self.ae.associate( + self.pacs_config['host'], + self.pacs_config['port'], + ae_title=self.pacs_config['aet'] + ) + + result = { + 'success': False, + 'total_files': len(dicom_files), + 'successful_sends': 0, + 'failed_sends': 0, + 'failures': [], + 'study_path': study_path + } + + if assoc.is_established: + try: + logger.debug(f"Association established with {self.pacs_config['aet']}, sending DICOM files") + + for file_path in dicom_files: + try: + # Read the DICOM file + dataset = pydicom.dcmread(file_path, force=True) + + # Get identifying information for logging + sop_instance_uid = getattr(dataset, 'SOPInstanceUID', 'Unknown') + series_instance_uid = getattr(dataset, 'SeriesInstanceUID', 'Unknown') + + # Send the dataset + status = assoc.send_c_store(dataset) + + if status and status.Status == 0x0000: # Success + logger.debug(f"Successfully sent: {os.path.basename(file_path)}") + result['successful_sends'] += 1 + else: + logger.error(f"Failed to send {os.path.basename(file_path)}: {status}") + result['failed_sends'] += 1 + result['failures'].append({ + 'file': file_path, + 'sop_instance_uid': sop_instance_uid, + 'series_instance_uid': series_instance_uid, + 'status': str(status) if status else 'Unknown error' + }) + except Exception as e: + logger.error(f"Error processing file {file_path}: {str(e)}") + result['failed_sends'] += 1 + result['failures'].append({ + 'file': file_path, + 'error': str(e) + }) + + # Update success flag based on results + if result['successful_sends'] == result['total_files']: + result['success'] = True + elif result['successful_sends'] > 0: + # Partial success + result['success'] = True + logger.warning(f"Partial success: {result['successful_sends']}/{result['total_files']} files sent") + + logger.info(f"C-STORE completed: {result['successful_sends']}/{result['total_files']} files sent") + + # Remove DICOM files after sending + self._remove_dicom_files(study_path) + + except Exception as e: + error_msg = f"Error during C-STORE operations: {str(e)}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomStoreError(error_msg) + finally: + # Release the association + assoc.release() + logger.debug("Association released") + else: + error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomStoreError(error_msg) + + return result + + @dicom_retry(exception_types=(DicomStoreError, ConnectionError)) + def send_series(self, study_uid, series_uid, base_dir=None): + """ + Send all DICOM files in a specific series to the destination PACS. + + Args: + study_uid (str): StudyInstanceUID + series_uid (str): SeriesInstanceUID to send + base_dir (str, optional): Base directory where DICOM files are stored. Defaults to settings.DICOM_STORE_DIR. + + Returns: + dict: Summary of store operations with counts and status + """ + base_dir = base_dir or settings.DICOM_STORE_DIR + series_path = os.path.join(base_dir, study_uid, series_uid) + + logger.info(f"Sending DICOM files from series: {series_uid}") + + # Check if series directory exists + if not os.path.exists(series_path) or not os.path.isdir(series_path): + error_msg = f"Series directory does not exist: {series_path}" + logger.error(error_msg) + raise DicomStoreError(error_msg) + + # Find all DICOM files in the series directory + dicom_files = glob.glob(os.path.join(series_path, '*.dcm')) + + if not dicom_files: + logger.warning(f"No DICOM files found in series directory: {series_path}") + return { + 'success': False, + 'total_files': 0, + 'successful_sends': 0, + 'failed_sends': 0, + 'error': 'No DICOM files found' + } + + logger.info(f"Found {len(dicom_files)} DICOM files to send") + + # Create association + assoc = self.ae.associate( + self.pacs_config['host'], + self.pacs_config['port'], + ae_title=self.pacs_config['aet'] + ) + + result = { + 'success': False, + 'total_files': len(dicom_files), + 'successful_sends': 0, + 'failed_sends': 0, + 'failures': [], + 'series_path': series_path + } + + if assoc.is_established: + try: + logger.debug(f"Association established with {self.pacs_config['aet']}, sending DICOM files") + + for file_path in dicom_files: + try: + # Read the DICOM file + dataset = pydicom.dcmread(file_path, force=True) + + # Send the dataset + status = assoc.send_c_store(dataset) + + if status and status.Status == 0x0000: # Success + logger.debug(f"Successfully sent: {os.path.basename(file_path)}") + result['successful_sends'] += 1 + else: + logger.error(f"Failed to send {os.path.basename(file_path)}: {status}") + result['failed_sends'] += 1 + result['failures'].append({ + 'file': file_path, + 'status': str(status) if status else 'Unknown error' + }) + except Exception as e: + logger.error(f"Error processing file {file_path}: {str(e)}") + result['failed_sends'] += 1 + result['failures'].append({ + 'file': file_path, + 'error': str(e) + }) + + # Update success flag based on results + if result['successful_sends'] == result['total_files']: + result['success'] = True + elif result['successful_sends'] > 0: + # Partial success + result['success'] = True + logger.warning(f"Partial success: {result['successful_sends']}/{result['total_files']} files sent") + + logger.info(f"C-STORE completed: {result['successful_sends']}/{result['total_files']} files sent") + + # Remove DICOM files after sending + self._remove_dicom_files(series_path) + + except Exception as e: + error_msg = f"Error during C-STORE operations: {str(e)}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomStoreError(error_msg) + finally: + # Release the association + assoc.release() + logger.debug("Association released") + else: + error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomStoreError(error_msg) + + return result + + @dicom_retry(exception_types=(DicomStoreError, ConnectionError)) + def send_file(self, file_path): + """ + Send a single DICOM file to the destination PACS. + + Args: + file_path (str): Path to DICOM file to send + + Returns: + dict: Summary of store operation with status + """ + logger.info(f"Sending DICOM file: {file_path}") + + # Check if file exists + if not os.path.exists(file_path) or not os.path.isfile(file_path): + error_msg = f"DICOM file does not exist: {file_path}" + logger.error(error_msg) + raise DicomStoreError(error_msg) + + # Create association + assoc = self.ae.associate( + self.pacs_config['host'], + self.pacs_config['port'], + ae_title=self.pacs_config['aet'] + ) + + result = { + 'success': False, + 'file_path': file_path, + 'status': '', + 'error': '' + } + + if assoc.is_established: + try: + logger.debug(f"Association established with {self.pacs_config['aet']}, sending file") + + # Read the DICOM file + dataset = pydicom.dcmread(file_path, force=True) + + # Get identifying information for logging + sop_instance_uid = getattr(dataset, 'SOPInstanceUID', 'Unknown') + series_instance_uid = getattr(dataset, 'SeriesInstanceUID', 'Unknown') + study_instance_uid = getattr(dataset, 'StudyInstanceUID', 'Unknown') + + # Send the dataset + status = assoc.send_c_store(dataset) + + if status and status.Status == 0x0000: # Success + result['success'] = True + result['status'] = 'Success' + logger.info(f"Successfully sent file: {file_path}") + logger.debug(f"File details: Study={study_instance_uid}, Series={series_instance_uid}, SOP={sop_instance_uid}") + else: + result['status'] = f"Failed - Status: {status}" if status else "Failed - Unknown error" + logger.error(f"Failed to send file: {file_path}") + logger.error(f"Status: {status}") + + except Exception as e: + error_msg = f"Error sending file {file_path}: {str(e)}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomStoreError(error_msg) + finally: + # Release the association + assoc.release() + logger.debug("Association released") + else: + error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}" + logger.error(error_msg) + result['error'] = error_msg + raise DicomStoreError(error_msg) + + return result \ No newline at end of file diff --git a/utils/cleanup.py b/utils/cleanup.py new file mode 100644 index 0000000..a5f29db --- /dev/null +++ b/utils/cleanup.py @@ -0,0 +1,73 @@ +""" +DICOM file cleanup utility to ensure temporary DICOM files are removed even on script termination. +""" +import os +import shutil +import atexit +import signal +from config import settings +from utils.logger import main_logger as logger + +# Global registry of directories to clean up +_cleanup_dirs = set() + +def register_cleanup_dir(directory): + """ + Register a directory for cleanup when the script exits. + + Args: + directory (str): Path to the directory to be cleaned up + """ + global _cleanup_dirs + if os.path.exists(directory) and os.path.isdir(directory): + _cleanup_dirs.add(directory) + logger.debug(f"Registered directory for cleanup: {directory}") + +def cleanup_dicom_files(): + """ + Remove all registered DICOM directories. + This function is called when the script exits, ensuring cleanup even on abnormal termination. + """ + global _cleanup_dirs + logger.info(f"Running cleanup for {len(_cleanup_dirs)} directories") + + for directory in _cleanup_dirs: + try: + if os.path.exists(directory): + shutil.rmtree(directory) + logger.info(f"Cleanup: Removed DICOM files from {directory}") + except Exception as e: + logger.error(f"Cleanup: Failed to remove DICOM files from {directory}: {str(e)}") + + # Clear the registry after cleanup + _cleanup_dirs.clear() + +def register_exit_handlers(): + """ + Register cleanup handlers for various exit scenarios. + """ + # Register for normal exit + atexit.register(cleanup_dicom_files) + + # Register for signals + signal.signal(signal.SIGINT, signal_handler) # Ctrl+C + signal.signal(signal.SIGTERM, signal_handler) # Termination request + + logger.info("Registered cleanup handlers for script exit") + +def signal_handler(sig, frame): + """ + Handle termination signals by performing cleanup before exit. + + Args: + sig: Signal number + frame: Current stack frame + """ + signal_name = { + signal.SIGINT: 'SIGINT (Ctrl+C)', + signal.SIGTERM: 'SIGTERM' + }.get(sig, f'Signal {sig}') + + logger.info(f"Received {signal_name}, cleaning up...") + cleanup_dicom_files() + exit(0) \ No newline at end of file diff --git a/utils/dicom_utils.py b/utils/dicom_utils.py new file mode 100644 index 0000000..d60c794 --- /dev/null +++ b/utils/dicom_utils.py @@ -0,0 +1,86 @@ +""" +General utility functions for DICOM operations. +""" +import os +import json +from datetime import datetime +from utils.logger import main_logger as logger + +def create_directory_if_not_exists(directory_path): + """ + Create directory if it doesn't exist. + + Args: + directory_path (str): Path to directory + """ + if not os.path.exists(directory_path): + os.makedirs(directory_path, exist_ok=True) + logger.info(f"Created directory: {directory_path}") + +def save_json_data(data, filename, directory): + """ + Save data as JSON to specified directory. + + Args: + data (dict): Data to save + filename (str): Filename without path + directory (str): Directory to save to + + Returns: + str: Full path to saved file + """ + create_directory_if_not_exists(directory) + file_path = os.path.join(directory, filename) + + with open(file_path, 'w') as f: + json.dump(data, f, indent=2) + + logger.info(f"Saved JSON data to {file_path}") + return file_path + +def parse_dicom_date(date_str): + """ + Parse DICOM date format (YYYYMMDD) to Python datetime. + + Args: + date_str (str): DICOM formatted date string + + Returns: + datetime: Python datetime object + """ + if not date_str or len(date_str) != 8: + return None + + try: + return datetime.strptime(date_str, '%Y%m%d') + except ValueError: + logger.error(f"Invalid DICOM date format: {date_str}") + return None + +def parse_dicom_datetime(datetime_str): + """ + Parse DICOM datetime format (YYYYMMDDHHMMSS) to Python datetime. + + Args: + datetime_str (str): DICOM formatted datetime string + + Returns: + datetime: Python datetime object + """ + if not datetime_str: + return None + + # Handle various DICOM datetime formats + try: + if len(datetime_str) == 14: # YYYYMMDDHHMMSS + return datetime.strptime(datetime_str, '%Y%m%d%H%M%S') + elif len(datetime_str) == 12: # YYYYMMDDHHMM + return datetime.strptime(datetime_str, '%Y%m%d%H%M') + elif len(datetime_str) == 8: # YYYYMMDD + return datetime.strptime(datetime_str, '%Y%m%d') + else: + logger.warning(f"Unexpected DICOM datetime format: {datetime_str}") + return None + except ValueError: + logger.error(f"Invalid DICOM datetime format: {datetime_str}") + return None \ No newline at end of file diff --git a/utils/error_handler.py b/utils/error_handler.py new file mode 100644 index 0000000..e4c2173 --- /dev/null +++ b/utils/error_handler.py @@ -0,0 +1,53 @@ +""" +Error handling and retry mechanisms for DICOM operations. +""" +import time +import functools +from retry import retry +from utils.logger import main_logger as logger +from config import settings + +# Define custom exceptions +class DicomConnectionError(Exception): + """Error when failing to establish DICOM association.""" + pass + +class DicomQueryError(Exception): + """Error when C-FIND operation fails.""" + pass + +class DicomRetrieveError(Exception): + """Error when C-GET operation fails.""" + pass + +class DicomStoreError(Exception): + """Error when C-STORE operation fails.""" + pass + +def dicom_retry(exception_types=(Exception,)): + """ + Decorator to retry DICOM operations with exponential backoff. + + Args: + exception_types: Tuple of exception types to retry on. + + Returns: + Decorated function with retry logic. + """ + def decorator(func): + @functools.wraps(func) + @retry( + exceptions=exception_types, + tries=settings.MAX_RETRIES, + delay=settings.RETRY_DELAY, + backoff=2, + logger=logger + ) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except exception_types as e: + logger.error(f"Error in {func.__name__}: {str(e)}") + raise + return wrapper + return decorator \ No newline at end of file diff --git a/utils/logger.py b/utils/logger.py new file mode 100644 index 0000000..240569a --- /dev/null +++ b/utils/logger.py @@ -0,0 +1,54 @@ +""" +Logging configuration and utility functions. +""" +import os +import logging +from logging.handlers import RotatingFileHandler +import sys +from config import settings + +def setup_logger(name, log_file=None): + """ + Set up a logger with both console and file handlers. + + Args: + name (str): Logger name + log_file (str, optional): Log file path. If None, uses the default from settings. + + Returns: + logging.Logger: Configured logger + """ + if log_file is None: + log_file = settings.LOG_FILE + + # Create the logs directory if it doesn't exist + os.makedirs(os.path.dirname(log_file), exist_ok=True) + + logger = logging.getLogger(name) + logger.setLevel(getattr(logging, settings.LOG_LEVEL)) + + # Create formatter + formatter = logging.Formatter(settings.LOG_FORMAT, datefmt='%Y-%m-%d %H:%M:%S') + + # Create console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(getattr(logging, settings.LOG_LEVEL)) + console_handler.setFormatter(formatter) + + # Create file handler + file_handler = RotatingFileHandler( + log_file, + maxBytes=settings.MAX_LOG_SIZE, + backupCount=settings.BACKUP_COUNT + ) + file_handler.setLevel(getattr(logging, settings.LOG_LEVEL)) + file_handler.setFormatter(formatter) + + # Add handlers to logger + logger.addHandler(console_handler) + logger.addHandler(file_handler) + + return logger + +# Create main application logger +main_logger = setup_logger("LOG") \ No newline at end of file