diff --git a/main.py b/main.py index 7ba1120..e66aee5 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,7 @@ from utils.cleanup import register_exit_handlers, register_cleanup_dir from services.dicom_finder import DicomFinder from services.dicom_retriever import DicomRetriever from services.dicom_sender import DicomSender +import signal def parse_args(): """Parse command line arguments.""" @@ -362,6 +363,142 @@ def send_file(args): logger.error(f"Error sending file: {str(e)}") sys.exit(1) +def process_single_study(study_uid, accession_number=None, log_dir=None): + """ + Process a single study: retrieve it, send to destination PACS, + and create detailed JSON logs. + + Args: + study_uid: Study Instance UID + accession_number: Optional accession number + log_dir: Directory to save logs + + Returns: + dict: Result information + """ + log_dir = log_dir or settings.JSON_OUTPUT_DIR + + # Create services + finder = DicomFinder() + retriever = DicomRetriever() + sender = DicomSender() + + # First, get the study metadata with a query if not provided + if not accession_number: + study_metadata = finder.find_study_by_uid(study_uid) + if not study_metadata: + logger.error(f"Study not found: {study_uid}") + return {'success': False, 'error': 'Study not found'} + accession_number = getattr(study_metadata, 'AccessionNumber', '') + else: + study_metadata = None + + logger.info(f"Processing study: {accession_number} with Study_IUID ({study_uid})") + + # STEP 1: Retrieve the study + retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number) + if not retrieve_result['success']: + logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}") + return {'success': False, 'error': f"Failed to retrieve study: {retrieve_result['status']}"} + + logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}") + + # STEP 2: Send the study to destination PACS + send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid)) + if not send_result['success']: + logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}") + + logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS") + + # STEP 3: Find all series and create detailed logs + series_list = finder.find_series_for_study(study_uid, accession_number=accession_number) + + # Get metadata from study_metadata if available, otherwise from the first element of study list + if study_metadata: + study_date = getattr(study_metadata, 'StudyDate', '') + study_time = getattr(study_metadata, 'StudyTime', '000000') + patient_id = getattr(study_metadata, 'PatientID', '') + study_description = getattr(study_metadata, 'StudyDescription', '') + else: + # We need to query for it again + study_details = finder.find_study_by_uid(study_uid) + study_date = getattr(study_details, 'StudyDate', '') if study_details else '' + study_time = getattr(study_details, 'StudyTime', '000000') if study_details else '000000' + patient_id = getattr(study_details, 'PatientID', '') if study_details else '' + study_description = getattr(study_details, 'StudyDescription', '') if study_details else '' + + study_datetime = f"{study_date}{study_time}" + + study_log = { + 'Study_IUID': study_uid, + 'AccessionNumber': accession_number, + 'PatientID': patient_id, + 'StudyDescription': study_description, + 'StudyDateTime': study_datetime, + 'CstoreSuccess': send_result['success'], + 'Series': [] + } + + for series in series_list: + series_uid = series.SeriesInstanceUID + + # Get first instance for the series + instance = finder.find_first_instance_for_series(study_uid, series_uid) + + series_info = { + 'Series_IUID': getattr(series, 'SeriesInstanceUID', ''), + 'SeriesNumber': getattr(series, 'SeriesNumber', ''), + 'SeriesDescription': getattr(series, 'SeriesDescription', ''), + 'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''), + 'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else '' + } + + study_log['Series'].append(series_info) + + # STEP 4: Send study_log to HIS API + his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}" + try: + headers = { + 'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==', + 'Content-Type': 'application/json' + } + response = requests.post(his_url, json=study_log, headers=headers) + + # Add response data to the log + study_log['HisApiResponse'] = { + 'StatusCode': response.status_code, + 'ResponseText': response.text, + 'Success': False, + 'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') + } + + # Safer JSON parsing + try: + response_data = response.json() if response.content else {} + study_log['HisApiResponse']['ResponseData'] = response_data + + if response.status_code == 200 and response_data.get('OK') == "1": + study_log['HisApiResponse']['Success'] = True + logger.info(f"Successfully sent JSON {accession_number} to HIS API") + return {'success': True, 'study_log': study_log, 'his_success': True} + else: + error_msg = response_data.get('message', 'Unknown error') + logger.error(f"Failed to send JSON for {accession_number} to HIS API: {error_msg}. Study_IUID: {study_uid}") + return {'success': True, 'study_log': study_log, 'his_success': False} + except ValueError: + study_log['HisApiResponse']['ParseError'] = "Invalid JSON response" + logger.error(f"Failed to parse response for {accession_number}, invalid JSON response. Status code: {response.status_code}") + return {'success': True, 'study_log': study_log, 'his_success': False} + + except Exception as e: + study_log['HisApiResponse'] = { + 'Success': False, + 'Error': str(e), + 'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') + } + logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}") + return {'success': True, 'study_log': study_log, 'his_success': False, 'error': str(e)} + def process_workflow(args): """ Process the full workflow: find studies by date range, retrieve them, and send to destination PACS. @@ -387,11 +524,10 @@ def process_workflow(args): # Create services finder = DicomFinder() - retriever = DicomRetriever() - sender = DicomSender() # Create directory for logs create_directory_if_not_exists(log_dir) + create_directory_if_not_exists("logs") # STEP 1: Find studies in the date range try: @@ -411,12 +547,30 @@ def process_workflow(args): save_json_data(studies_summary, f"studies_{start_date}_to_{end_date}.json", log_dir) # STEP 2, 3, 4: For each study, retrieve, send, and log details - his_log_filename = f"sendtohis_{start_date}_{end_date}.json" - his_log_path = os.path.join("logs", his_log_filename) his_log = [] - his_fail_log_filename = f"fail_sendtohis_{start_date}_{end_date}.json" - his_fail_log_path = os.path.join("logs", his_fail_log_filename) his_fail_log = [] + his_log_filename = f"sendtohis_{start_date}_{end_date}.json" + his_fail_log_filename = f"fail_sendtohis_{start_date}_{end_date}.json" + + # Register signal handler for SIGINT + original_sigint_handler = signal.getsignal(signal.SIGINT) + + def sigint_handler(sig, frame): + logger.warning("Process interrupted by user. Saving logs before exiting...") + # Save HIS logs + if his_log: + save_json_data(his_log, his_log_filename, "logs") + logger.info(f"Saved {len(his_log)} successful logs to {his_log_filename}") + if his_fail_log: + save_json_data(his_fail_log, his_fail_log_filename, "logs") + logger.info(f"Saved {len(his_fail_log)} failed logs to {his_fail_log_filename}") + # Restore original signal handler and raise KeyboardInterrupt + signal.signal(signal.SIGINT, original_sigint_handler) + sys.exit(1) + + # Set up our custom signal handler + signal.signal(signal.SIGINT, sigint_handler) + for i, study in enumerate(studies): accession_number = study.AccessionNumber @@ -429,86 +583,35 @@ def process_workflow(args): logger.info(f"Skipping study {study_uid} - log file already exists") continue - # STEP 2: Retrieve the study try: - retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number) - if not retrieve_result['success']: - logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}") + result = process_single_study(study_uid, accession_number, log_dir) + + if not result['success']: + logger.error(f"Failed to process study {study_uid}: {result.get('error', 'Unknown error')}") continue - logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}") + # Record results for HIS logs + if result.get('his_success', False): + his_log.append(result['study_log']) + else: + his_fail_log.append(result['study_log']) - # STEP 3: Send the study to destination PACS - send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid)) - if not send_result['success']: - logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}") - - logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS") - - # STEP 4: Find all series and create detailed logs - series_list = finder.find_series_for_study(study_uid, accession_number=accession_number) - - study_date = getattr(study, 'StudyDate', '') - study_time = getattr(study, 'StudyTime', '000000') # Default to midnight if StudyTime is not available - study_datetime = f"{study_date}{study_time}" - - study_log = { - 'Study_IUID': study_uid, - 'AccessionNumber': getattr(study, 'AccessionNumber', ''), - 'PatientID': getattr(study, 'PatientID', ''), # MedrecID - 'StudyDescription': getattr(study, 'StudyDescription', ''), - 'StudyDateTime': study_datetime, - 'CstoreSuccess': send_result['success'], - 'Series': [] - } - - for series in series_list: - series_uid = series.SeriesInstanceUID + # Save progress incrementally every 5 studies + if (i + 1) % 5 == 0: + if his_log: + save_json_data(his_log, his_log_filename, "logs") + if his_fail_log: + save_json_data(his_fail_log, his_fail_log_filename, "logs") + logger.info(f"Saved progress after processing {i+1} studies") - # Get first instance for the series - instance = finder.find_first_instance_for_series(study_uid, series_uid) - - series_info = { - 'Series_IUID': getattr(series, 'SeriesInstanceUID', ''), - 'SeriesNumber': getattr(series, 'SeriesNumber', ''), - 'SeriesDescription': getattr(series, 'SeriesDescription', ''), - 'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''), - 'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else '' - } - - study_log['Series'].append(series_info) - - # Save the detailed log - # save_json_data(study_log, f"{study_uid}.json", log_dir) - # logger.info(f"Created detailed log for study {study_uid}") - - # STEP 5: Send study_log to HIS API - his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}" - try: - # Header tembak API HIS - headers = { - 'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==', - 'Content-Type': 'application/json' - } - response = requests.post(his_url, json=study_log, headers=headers) - - # Gunakan ini untuk development - # response = requests.post(his_url, json=study_log) - - if response.status_code == 200 and response.json().get('OK') == "1": - his_log.append(study_log) - logger.info(f"Successfully sent JSON {accession_number} to HIS API") - else: - his_fail_log.append(study_log) - logger.error(f"Failed to send JSON for {accession_number} to HIS API: {response.msg}. Study_IUID: {study_uid}") - except Exception as e: - logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}") - except Exception as e: logger.error(f"Error processing study {accession_number}: {str(e)}") continue - # Save HIS log + # Restore original signal handler + signal.signal(signal.SIGINT, original_sigint_handler) + + # Final Save HIS logs if his_log: save_json_data(his_log, his_log_filename, "logs") if his_fail_log: @@ -522,8 +625,7 @@ def process_workflow(args): def process_workflow_by_study(args): """ - Process the full workflow for a specific study: retrieve it, send to destination PACS, - and create detailed JSON logs. + Process the full workflow for a specific study. Args: args: Command line arguments @@ -533,103 +635,64 @@ def process_workflow_by_study(args): logger.info(f"Starting full workflow process for study: {study_uid}") - # Create services - finder = DicomFinder() - retriever = DicomRetriever() - sender = DicomSender() - # Create directory for logs create_directory_if_not_exists(log_dir) + create_directory_if_not_exists("logs") - # Process the study - try: - # First, get the study metadata with a query - study_metadata = finder.find_study_by_uid(study_uid) - - if not study_metadata: - logger.error(f"Study not found: {study_uid}") - sys.exit(1) - - accession_number = getattr(study_metadata, 'AccessionNumber', '') - logger.info(f"Processing study: {accession_number} with Study_IUID ({study_uid})") - - # STEP 1: Retrieve the study - retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number) - if not retrieve_result['success']: - logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}") - sys.exit(1) - - logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}") - - # STEP 2: Send the study to destination PACS - send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid)) - if not send_result['success']: - logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}") - - logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS") - - # STEP 3: Find all series and create detailed logs - series_list = finder.find_series_for_study(study_uid, accession_number=accession_number) - - study_date = getattr(study_metadata, 'StudyDate', '') - study_time = getattr(study_metadata, 'StudyTime', '000000') # Default to midnight if StudyTime is not available - study_datetime = f"{study_date}{study_time}" + # Register signal handler for SIGINT + original_sigint_handler = signal.getsignal(signal.SIGINT) + + # Create empty containers for logs + his_log = [] + his_fail_log = [] + his_log_filename = f"sendtohis_study_{study_uid}.json" + his_fail_log_filename = f"fail_sendtohis_study_{study_uid}.json" - study_log = { - 'Study_IUID': study_uid, - 'AccessionNumber': getattr(study_metadata, 'AccessionNumber', ''), - 'PatientID': getattr(study_metadata, 'PatientID', ''), # MedrecID - 'StudyDescription': getattr(study_metadata, 'StudyDescription', ''), - 'StudyDateTime': study_datetime, - 'CstoreSuccess': send_result['success'], - 'Series': [] - } - - for series in series_list: - series_uid = series.SeriesInstanceUID + def sigint_handler(sig, frame): + logger.warning("Process interrupted by user. Saving logs before exiting...") + # Save individual study log + if 'result' in locals() and result.get('success', False): + save_json_data(result['study_log'], f"{study_uid}.json", log_dir) - # Get first instance for the series - instance = finder.find_first_instance_for_series(study_uid, series_uid) - - series_info = { - 'Series_IUID': getattr(series, 'SeriesInstanceUID', ''), - 'SeriesNumber': getattr(series, 'SeriesNumber', ''), - 'SeriesDescription': getattr(series, 'SeriesDescription', ''), - 'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''), - 'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else '' - } - - study_log['Series'].append(series_info) - - # STEP 4: Send study_log to HIS API - his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}" - try: - # Header tembak API HIS - headers = { - 'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==', - 'Content-Type': 'application/json' - } - response = requests.post(his_url, json=study_log, headers=headers) - - # Pakai ini untuk Development - # response = requests.post(his_url, json=study_log) - - if response.status_code == 200 and response.json().get('OK') == "1": - logger.info(f"Successfully sent JSON {accession_number} to HIS API") - # Save successful log - save_json_data([study_log], f"sendtohis_study_{study_uid}.json", "logs") + # Save HIS logs + if result.get('his_success', False): + save_json_data([result['study_log']], his_log_filename, "logs") + logger.info(f"Saved successful log to {his_log_filename}") else: - logger.error(f"Failed to send JSON for {accession_number} to HIS API. Study_IUID: {study_uid}") - # Save failed log - save_json_data([study_log], f"fail_sendtohis_study_{study_uid}.json", "logs") - except Exception as e: - logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}") + save_json_data([result['study_log']], his_fail_log_filename, "logs") + logger.info(f"Saved failed log to {his_fail_log_filename}") + + # Restore original signal handler and exit + signal.signal(signal.SIGINT, original_sigint_handler) + sys.exit(1) + + # Set up our custom signal handler + signal.signal(signal.SIGINT, sigint_handler) + + try: + result = process_single_study(study_uid, log_dir=log_dir) + if not result['success']: + logger.error(f"Failed to process study {study_uid}: {result.get('error', 'Unknown error')}") + sys.exit(1) + + # Save individual study log + save_json_data(result['study_log'], f"{study_uid}.json", log_dir) + + # Save logs + if result.get('his_success', False): + save_json_data([result['study_log']], his_log_filename, "logs") + else: + save_json_data([result['study_log']], his_fail_log_filename, "logs") + logger.info(f"Completed processing study: {study_uid}") except Exception as e: logger.error(f"Error during study processing: {str(e)}") sys.exit(1) + finally: + # Restore original signal handler + signal.signal(signal.SIGINT, original_sigint_handler) def main(): """Main function.""" diff --git a/services/dicom_retriever.py b/services/dicom_retriever.py index 201ac02..90fa839 100644 --- a/services/dicom_retriever.py +++ b/services/dicom_retriever.py @@ -39,6 +39,7 @@ class DicomRetriever: # Add storage presentation contexts (needed for receiving the images) # for context in StoragePresentationContexts: # self.ae.add_requested_context(context.abstract_syntax) + storage_uids = [ '1.2.840.10008.5.1.4.1.1.1', # CR Storage '1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage diff --git a/services/dicom_sender.py b/services/dicom_sender.py index f152c73..eb99417 100644 --- a/services/dicom_sender.py +++ b/services/dicom_sender.py @@ -6,11 +6,15 @@ import glob import pydicom import shutil from pydicom.dataset import Dataset -from pynetdicom import AE, StoragePresentationContexts, evt +from pynetdicom import AE, StoragePresentationContexts, evt, build_role from config import settings from utils.logger import main_logger as logger from utils.error_handler import DicomStoreError, dicom_retry from utils.dicom_utils import create_directory_if_not_exists +from pynetdicom.sop_class import ( + StudyRootQueryRetrieveInformationModelGet, + PatientRootQueryRetrieveInformationModelGet +) class DicomSender: """ @@ -27,9 +31,32 @@ class DicomSender: self.pacs_config = pacs_config or settings.DESTINATION_PACS self.ae = AE(ae_title=settings.SOURCE_AET) - # Add storage presentation contexts (all standard transfer syntaxes for each SOP class) - for context in StoragePresentationContexts: - self.ae.add_requested_context(context.abstract_syntax) + # Add the Query/Retrieve SOP classes + self.ae.add_requested_context(StudyRootQueryRetrieveInformationModelGet) + self.ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet) + + # Add storage presentation contexts (needed for receiving the images) + # for context in StoragePresentationContexts: + # self.ae.add_requested_context(context.abstract_syntax) + + storage_uids = [ + '1.2.840.10008.5.1.4.1.1.1', # CR Storage + '1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage + '1.2.840.10008.5.1.4.1.1.2', # CT Image Storage + '1.2.840.10008.5.1.4.1.1.4', # MR Image Storage + '1.2.840.10008.5.1.4.1.1.7', # Secondary Capture Image Storage + '1.2.840.10008.5.1.4.1.1.6.1', # Ultrasound Image Storage + '1.2.840.10008.5.1.4.1.1.128', # PET Image Storage + '1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine Image Storage + '1.2.840.10008.5.1.4.1.1.9.1.1', # 12-lead ECG Waveform Storage + '1.2.840.10008.5.1.4.1.1.9.1.2', # General ECG Waveform Storage + ] + self.ext_neg = [] + for uid in storage_uids: + self.ae.add_requested_context(uid) + role = build_role(uid, scp_role=True) + self.ext_neg.append(role) + logger.info(f"DicomSender initialized with destination PACS: {self.pacs_config['aet']}@{self.pacs_config['host']}:{self.pacs_config['port']}")