#!/usr/bin/env python3 """ Process module for DICOM migration API service. This module contains the core processing logic for migrating DICOM studies through the API. """ import os import sys import json import shutil from datetime import datetime import requests # Add parent directory to sys.path to allow importing from the main project sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from config import settings from services.dicom_finder import DicomFinder from services.dicom_retriever import DicomRetriever from services.dicom_sender import DicomSender from utils.logger import setup_logger from utils.dicom_utils import save_json_data, create_directory_if_not_exists # Create API process logger process_logger = setup_logger("api_process", "logs/api_process.log") def cleanup_study_directory(study_uid): """ Explicitly clean up the DICOM files for a study after processing. Args: study_uid: Study Instance UID to clean up Returns: bool: True if cleanup was successful, False otherwise """ study_dir = os.path.join(settings.DICOM_STORE_DIR, study_uid) process_logger.info(f"Cleaning up DICOM files for study: {study_uid} at {study_dir}") try: if os.path.exists(study_dir): shutil.rmtree(study_dir) process_logger.info(f"Successfully removed study directory: {study_dir}") return True else: process_logger.warning(f"Study directory does not exist: {study_dir}") return True # Return True since there's nothing to clean except Exception as e: process_logger.error(f"Error cleaning up study directory {study_dir}: {str(e)}") return False def process_study_by_accession(accession_number): """ Process a study by its accession number for the API service. This function: 1. Finds the study UID based on accession number 2. Retrieves DICOM files using C-GET 3. Sends DICOM files to destination PACS using C-STORE 4. Sends study information to HIS API 5. Cleans up temporary DICOM files Args: accession_number: The accession number of the study to process Returns: dict: Result with success status, message, and details """ process_logger.info(f"Processing study with accession number: {accession_number}") # Initialize response data response_data = { "success": False, "message": "", "details": { "accession_number": accession_number, "process_start_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'), "steps_completed": [] } } try: # STEP 1: Find study by accession number to get the Study UID process_logger.info(f"Finding study with accession number: {accession_number}") finder = DicomFinder() study = finder.find_study_by_accession(accession_number) if not study: process_logger.error(f"Study not found for accession number: {accession_number}") response_data["message"] = f"Study not found for accession number: {accession_number}" return response_data study_uid = study.StudyInstanceUID patient_id = getattr(study, 'PatientID', '') study_description = getattr(study, 'StudyDescription', '') study_date = getattr(study, 'StudyDate', '') study_time = getattr(study, 'StudyTime', '000000') response_data["details"]["study_uid"] = study_uid response_data["details"]["patient_id"] = patient_id response_data["details"]["study_description"] = study_description process_logger.info(f"Found study with UID: {study_uid} for accession number: {accession_number}") response_data["details"]["steps_completed"].append("study_found") # STEP 2: Retrieve the study using C-GET process_logger.info(f"Retrieving study: {study_uid}") retriever = DicomRetriever() retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number) if not retrieve_result['success']: process_logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}") cleanup_study_directory(study_uid) # Clean up any partial files response_data["message"] = f"Failed to retrieve study: {retrieve_result['status']}" return response_data process_logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}") response_data["details"]["instances_retrieved"] = retrieve_result['successful_instances'] response_data["details"]["steps_completed"].append("study_retrieved") # STEP 3: Send the study to destination PACS using C-STORE process_logger.info(f"Sending study {study_uid} to destination PACS") sender = DicomSender() send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid)) response_data["details"]["files_sent"] = send_result['successful_sends'] response_data["details"]["total_files"] = send_result['total_files'] if not send_result['success']: process_logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}") # We continue processing even if send fails, to log the details response_data["details"]["c_store_success"] = False response_data["details"]["c_store_error"] = send_result.get('error', 'Unknown error') else: process_logger.info(f"Successfully sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS") response_data["details"]["c_store_success"] = True response_data["details"]["steps_completed"].append("study_sent") # STEP 4: Create detailed log for the study process_logger.info(f"Creating detailed log for study: {study_uid}") series_list = finder.find_series_for_study(study_uid, accession_number=accession_number) study_datetime = f"{study_date}{study_time}" study_log = { 'Study_IUID': study_uid, 'AccessionNumber': accession_number, 'PatientID': patient_id, 'StudyDescription': study_description, 'StudyDateTime': study_datetime, 'CstoreSuccess': send_result['success'], 'Series': [] } for series in series_list: series_uid = series.SeriesInstanceUID # Get first instance for the series instance = finder.find_first_instance_for_series(study_uid, series_uid) # Helper function to safely get attributes def safe_get_attr(obj, attr_name, default=''): """Get attribute value, ensuring None is converted to default.""" value = getattr(obj, attr_name, default) return default if value is None else str(value) series_info = { 'Series_IUID': safe_get_attr(series, 'SeriesInstanceUID'), 'SeriesNumber': safe_get_attr(series, 'SeriesNumber'), 'SeriesDescription': safe_get_attr(series, 'SeriesDescription'), 'NumberOfInstances': safe_get_attr(series, 'NumberOfSeriesRelatedInstances'), 'SOP_IUID': safe_get_attr(instance, 'SOPInstanceUID') if instance else '' } study_log['Series'].append(series_info) response_data["details"]["series_count"] = len(study_log['Series']) response_data["details"]["steps_completed"].append("log_created") # STEP 5: Send study_log to HIS API process_logger.info(f"Sending study data to HIS API for accession: {accession_number}") his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}" try: headers = { 'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==', 'Content-Type': 'application/json' } response = requests.post(his_url, json=study_log, headers=headers) # Add response data to the log study_log['HisApiResponse'] = { 'StatusCode': response.status_code, 'ResponseText': response.text, 'Success': False, 'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } response_data["details"]["his_api_status_code"] = response.status_code # Parse the response try: response_data_his = response.json() if response.content else {} study_log['HisApiResponse']['ResponseData'] = response_data_his if response.status_code == 200 and response_data_his.get('OK') == "1": study_log['HisApiResponse']['Success'] = True process_logger.info(f"Successfully sent JSON {accession_number} to HIS API") response_data["details"]["his_integration_success"] = True response_data["details"]["steps_completed"].append("his_api_success") else: error_msg = response_data_his.get('MSG', response_data_his.get('message', 'Unknown error')) process_logger.error(f"Failed to send JSON for {accession_number} to HIS API: {error_msg}. Study_IUID: {study_uid}") response_data["details"]["his_integration_success"] = False response_data["details"]["his_error"] = error_msg response_data["details"]["steps_completed"].append("his_api_failed") except ValueError: study_log['HisApiResponse']['ParseError'] = "Invalid JSON response" process_logger.error(f"Failed to parse response for {accession_number}, invalid JSON response. Status code: {response.status_code}") response_data["details"]["his_integration_success"] = False response_data["details"]["his_error"] = "Invalid JSON response from HIS API" response_data["details"]["steps_completed"].append("his_api_failed") except Exception as e: study_log['HisApiResponse'] = { 'Success': False, 'Error': str(e), 'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } process_logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}") response_data["details"]["his_integration_success"] = False response_data["details"]["his_error"] = str(e) response_data["details"]["steps_completed"].append("his_api_error") # STEP 6: Clean up DICOM files cleanup_result = cleanup_study_directory(study_uid) if cleanup_result: process_logger.info(f"Successfully cleaned up DICOM files for study: {study_uid}") response_data["details"]["cleanup_success"] = True response_data["details"]["steps_completed"].append("cleanup_success") else: process_logger.warning(f"Failed to clean up DICOM files for study: {study_uid}") response_data["details"]["cleanup_success"] = False response_data["details"]["steps_completed"].append("cleanup_failed") # STEP 7: Save the study log for reference log_dir = settings.JSON_OUTPUT_DIR create_directory_if_not_exists(log_dir) log_filename = f"api_{accession_number}.json" save_json_data(study_log, log_filename, log_dir) process_logger.info(f"Saved study log to {os.path.join(log_dir, log_filename)}") # Set final response based on C-STORE success (primary success metric) response_data["success"] = send_result['success'] if send_result['success']: his_success = response_data["details"].get("his_integration_success", False) if his_success: response_data["message"] = f"Berhasil migrasi file DICOM Accession Number: {accession_number}" else: his_error = response_data["details"].get("his_error", "Unknown HIS error") response_data["message"] = f"Berhasil migrasi file DICOM tetapi gagal update HIS: {his_error}" else: response_data["message"] = f"Gagal migrasi file DICOM: {send_result.get('error', 'Unknown error')}" process_logger.info(f"Completed processing study with accession number: {accession_number}") response_data["details"]["process_end_time"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return response_data except Exception as e: process_logger.exception(f"Unexpected error processing study {accession_number}: {str(e)}") # Try to clean up if we have a study_uid if 'study_uid' in locals(): cleanup_study_directory(study_uid) response_data["message"] = f"Error during migration: {str(e)}" response_data["details"]["error"] = str(e) response_data["details"]["process_end_time"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return response_data