Files
pydicom-migrasi-clarity/services/dicom_retriever.py
2025-05-09 09:00:46 +07:00

359 lines
15 KiB
Python

"""
DICOM retriever service - Implements getscu functionality using pynetdicom.
"""
import os
import tempfile
from pydicom.dataset import Dataset
from pynetdicom import AE, evt, build_role, StoragePresentationContexts
from pynetdicom.sop_class import (
StudyRootQueryRetrieveInformationModelGet,
PatientRootQueryRetrieveInformationModelGet
)
from config import settings
from utils.logger import main_logger as logger
from utils.error_handler import DicomRetrieveError, dicom_retry
from utils.dicom_utils import create_directory_if_not_exists
from utils.cleanup import register_cleanup_dir
class DicomRetriever:
"""
Class to perform DICOM C-GET operations to retrieve studies.
"""
def __init__(self, pacs_config=None, store_dir=None):
"""
Initialize DicomRetriever with PACS settings.
Args:
pacs_config (dict, optional): PACS configuration dict containing host, port, aet
store_dir (str, optional): Directory to store retrieved DICOM files
"""
self.pacs_config = pacs_config or settings.SOURCE_PACS
self.store_dir = store_dir or settings.DICOM_STORE_DIR
self.ae = AE(ae_title=settings.SOURCE_AET)
# Add the Query/Retrieve SOP classes
self.ae.add_requested_context(StudyRootQueryRetrieveInformationModelGet)
self.ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
# Add storage presentation contexts (needed for receiving the images)
# for context in StoragePresentationContexts:
# self.ae.add_requested_context(context.abstract_syntax)
storage_uids = [
'1.2.840.10008.5.1.4.1.1.1', # CR Storage
'1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage
'1.2.840.10008.5.1.4.1.1.2', # CT Image Storage
'1.2.840.10008.5.1.4.1.1.4', # MR Image Storage
'1.2.840.10008.5.1.4.1.1.7', # Secondary Capture Image Storage
'1.2.840.10008.5.1.4.1.1.6.1', # Ultrasound Image Storage
'1.2.840.10008.5.1.4.1.1.128', # PET Image Storage
'1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine Image Storage
'1.2.840.10008.5.1.4.1.1.9.1.1', # 12-lead ECG Waveform Storage
'1.2.840.10008.5.1.4.1.1.9.1.2', # General ECG Waveform Storage
]
self.ext_neg = []
for uid in storage_uids:
self.ae.add_requested_context(uid)
role = build_role(uid, scp_role=True)
self.ext_neg.append(role)
# Create storage directory if it doesn't exist
create_directory_if_not_exists(self.store_dir)
logger.info(f"DicomRetriever initialized with PACS: {self.pacs_config['aet']}@{self.pacs_config['host']}:{self.pacs_config['port']}")
logger.info(f"DICOM files will be stored in: {self.store_dir}")
def _handle_store(self, event):
"""
Handle C-STORE operations during a C-GET association.
Args:
event: DICOM C-STORE event
Returns:
int: Status code (0 = Success)
"""
dataset = event.dataset
# Get the study, series, and instance UIDs
study_uid = dataset.StudyInstanceUID
series_uid = dataset.SeriesInstanceUID
instance_uid = dataset.SOPInstanceUID
# Create nested directory structure: StudyInstanceUID/SeriesInstanceUID/
study_dir = os.path.join(self.store_dir, study_uid)
series_dir = os.path.join(study_dir, series_uid)
create_directory_if_not_exists(series_dir)
# Save the dataset to file
filename = f"{instance_uid}.dcm"
file_path = os.path.join(series_dir, filename)
try:
dataset.save_as(file_path, write_like_original=False)
logger.debug(f"Stored instance {instance_uid} to {file_path}")
return 0x0000 # Success status
except Exception as e:
logger.error(f"Error storing instance {instance_uid}: {str(e)}")
return 0xC001 # Failed - Unable to store
@dicom_retry(exception_types=(DicomRetrieveError, ConnectionError))
def retrieve_study(self, study_instance_uid, accession_number=None):
"""
Retrieve a complete study using C-GET.
Args:
study_instance_uid (str): StudyInstanceUID to retrieve
accession_number (str, optional): AccessionNumber to filter the study
Returns:
dict: Summary of retrieved data with counts and status
"""
logger.info(f"Retrieving study: {accession_number} with Study_IUID {study_instance_uid}")
# Create study-specific directory
study_dir = os.path.join(self.store_dir, study_instance_uid)
create_directory_if_not_exists(study_dir)
# Register this directory for cleanup on exit
register_cleanup_dir(study_dir)
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
ds.StudyInstanceUID = study_instance_uid
# Create association
# Bind the evt.EVT_C_STORE handler to our _handle_store function
handlers = [(evt.EVT_C_STORE, self._handle_store)]
assoc = self.ae.associate(
self.pacs_config['host'],
self.pacs_config['port'],
ae_title=self.pacs_config['aet'],
evt_handlers=handlers,
ext_neg=self.ext_neg, # No extended negotiation
)
result = {
'success': False,
'study_uid': study_instance_uid,
'total_instances': 0,
'successful_instances': 0,
'failed_instances': 0,
'status': '',
'error': '',
'study_dir': study_dir
}
if assoc.is_established:
try:
logger.debug(f"Association established, sending C-GET request for study {study_instance_uid}")
# Send C-GET request
responses = assoc.send_c_get(
ds,
StudyRootQueryRetrieveInformationModelGet )
# Track progress
for (status, identifier) in responses:
if status:
# Update status information
result['total_instances'] = getattr(status, 'NumberOfRemainingSuboperations', 0) + \
getattr(status, 'NumberOfCompletedSuboperations', 0) + \
getattr(status, 'NumberOfFailedSuboperations', 0) + \
getattr(status, 'NumberOfWarningSuboperations', 0)
result['successful_instances'] = getattr(status, 'NumberOfCompletedSuboperations', 0)
result['failed_instances'] = getattr(status, 'NumberOfFailedSuboperations', 0)
# Log progress for large retrievals
if status.Status == 0xFF00: # Pending
if hasattr(status, 'NumberOfRemainingSuboperations'):
logger.debug(f"C-GET progress: {result['successful_instances']}/{result['total_instances']} instances received")
# Check if operation was successful
if status.Status == 0x0000: # Success
result['success'] = True
result['status'] = 'Success'
elif status.Status == 0xB000: # Warning
result['success'] = True
result['status'] = 'Warning - Suboperations Complete with Failures'
else:
result['status'] = f"Error - Status code: {status.Status:04x}"
logger.info(f"C-GET completed: {result['successful_instances']}/{result['total_instances']} instances retrieved")
except Exception as e:
error_msg = f"Error during C-GET: {str(e)}"
logger.error(error_msg)
result['error'] = error_msg
raise DicomRetrieveError(error_msg)
finally:
# Release the association
assoc.release()
logger.debug("Association released")
else:
error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}"
logger.error(error_msg)
result['error'] = error_msg
raise DicomRetrieveError(error_msg)
return result
@dicom_retry(exception_types=(DicomRetrieveError, ConnectionError))
def retrieve_series(self, study_instance_uid, series_instance_uid):
"""
Retrieve a specific series using C-GET.
Args:
study_instance_uid (str): StudyInstanceUID
series_instance_uid (str): SeriesInstanceUID to retrieve
Returns:
dict: Summary of retrieved data with counts and status
"""
logger.info(f"Retrieving series: {series_instance_uid} from study: {study_instance_uid}")
# Create series-specific directory
study_dir = os.path.join(self.store_dir, study_instance_uid)
series_dir = os.path.join(study_dir, series_instance_uid)
create_directory_if_not_exists(series_dir)
# Register directories for cleanup on exit
register_cleanup_dir(study_dir)
register_cleanup_dir(series_dir)
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = 'SERIES'
ds.StudyInstanceUID = study_instance_uid
ds.SeriesInstanceUID = series_instance_uid
# Create association
# Bind the evt.EVT_C_STORE handler to our _handle_store function
handlers = [(evt.EVT_C_STORE, self._handle_store)]
assoc = self.ae.associate(
self.pacs_config['host'],
self.pacs_config['port'],
ae_title=self.pacs_config['aet'],
evt_handlers=handlers,
ext_neg=self.ext_neg, # No extended negotiation
)
result = {
'success': False,
'study_uid': study_instance_uid,
'series_uid': series_instance_uid,
'total_instances': 0,
'successful_instances': 0,
'failed_instances': 0,
'status': '',
'error': '',
'series_dir': series_dir
}
if assoc.is_established:
try:
logger.debug(f"Association established, sending C-GET request for series {series_instance_uid}")
# Send C-GET request
responses = assoc.send_c_get(
ds,
StudyRootQueryRetrieveInformationModelGet )
# Track progress
for (status, identifier) in responses:
if status:
# Update status information
result['total_instances'] = getattr(status, 'NumberOfRemainingSuboperations', 0) + \
getattr(status, 'NumberOfCompletedSuboperations', 0) + \
getattr(status, 'NumberOfFailedSuboperations', 0) + \
getattr(status, 'NumberOfWarningSuboperations', 0)
result['successful_instances'] = getattr(status, 'NumberOfCompletedSuboperations', 0)
result['failed_instances'] = getattr(status, 'NumberOfFailedSuboperations', 0)
# Check if operation was successful
if status.Status == 0x0000: # Success
result['success'] = True
result['status'] = 'Success'
elif status.Status == 0xB000: # Warning
result['success'] = True
result['status'] = 'Warning - Suboperations Complete with Failures'
else:
result['status'] = f"Error - Status code: {status.Status:04x}"
logger.info(f"C-GET completed: {result['successful_instances']}/{result['total_instances']} instances retrieved")
except Exception as e:
error_msg = f"Error during C-GET: {str(e)}"
logger.error(error_msg)
result['error'] = error_msg
raise DicomRetrieveError(error_msg)
finally:
# Release the association
assoc.release()
logger.debug("Association released")
else:
error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}"
logger.error(error_msg)
result['error'] = error_msg
raise DicomRetrieveError(error_msg)
return result
def get_retrieved_file_count(self, study_uid, series_uid=None):
"""
Count the number of files retrieved for a study or series.
Args:
study_uid (str): StudyInstanceUID
series_uid (str, optional): SeriesInstanceUID. If None, count all files in the study.
Returns:
int: Number of DICOM files found
"""
study_dir = os.path.join(self.store_dir, study_uid)
if not os.path.exists(study_dir):
logger.warning(f"Study directory does not exist: {study_dir}")
return 0
file_count = 0
if series_uid:
# Count files in a specific series
series_dir = os.path.join(study_dir, series_uid)
if os.path.exists(series_dir):
# Count only .dcm files
file_count = len([f for f in os.listdir(series_dir) if f.endswith('.dcm')])
else:
# Count files in all series of the study
for series_name in os.listdir(study_dir):
series_dir = os.path.join(study_dir, series_name)
if os.path.isdir(series_dir):
file_count += len([f for f in os.listdir(series_dir) if f.endswith('.dcm')])
return file_count
def is_study_complete(self, study_uid, expected_instance_count=None):
"""
Check if a study has been completely retrieved.
Args:
study_uid (str): StudyInstanceUID
expected_instance_count (int, optional): Expected number of instances.
Returns:
bool: True if study is complete, False otherwise
"""
if expected_instance_count is None:
logger.warning("No expected instance count provided, can't determine if study is complete")
return False
actual_count = self.get_retrieved_file_count(study_uid)
logger.info(f"Study {study_uid} completeness: {actual_count}/{expected_instance_count} instances")
return actual_count >= expected_instance_count