Files
pydicom-migrasi-clarity/api-app/process.py
2025-07-11 16:03:43 +07:00

285 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Process module for DICOM migration API service.
This module contains the core processing logic for migrating DICOM studies through the API.
"""
import os
import sys
import json
import shutil
from datetime import datetime
import requests
# Add parent directory to sys.path to allow importing from the main project
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import settings
from services.dicom_finder import DicomFinder
from services.dicom_retriever import DicomRetriever
from services.dicom_sender import DicomSender
from utils.logger import setup_logger
from utils.dicom_utils import save_json_data, create_directory_if_not_exists
# Create API process logger
process_logger = setup_logger("api_process", "logs/api_process.log")
def cleanup_study_directory(study_uid):
"""
Explicitly clean up the DICOM files for a study after processing.
Args:
study_uid: Study Instance UID to clean up
Returns:
bool: True if cleanup was successful, False otherwise
"""
study_dir = os.path.join(settings.DICOM_STORE_DIR, study_uid)
process_logger.info(f"Cleaning up DICOM files for study: {study_uid} at {study_dir}")
try:
if os.path.exists(study_dir):
shutil.rmtree(study_dir)
process_logger.info(f"Successfully removed study directory: {study_dir}")
return True
else:
process_logger.warning(f"Study directory does not exist: {study_dir}")
return True # Return True since there's nothing to clean
except Exception as e:
process_logger.error(f"Error cleaning up study directory {study_dir}: {str(e)}")
return False
def process_study_by_accession(accession_number):
"""
Process a study by its accession number for the API service.
This function:
1. Finds the study UID based on accession number
2. Retrieves DICOM files using C-GET
3. Sends DICOM files to destination PACS using C-STORE
4. Sends study information to HIS API
5. Cleans up temporary DICOM files
Args:
accession_number: The accession number of the study to process
Returns:
dict: Result with success status, message, and details
"""
process_logger.info(f"Processing study with accession number: {accession_number}")
# Initialize response data
response_data = {
"success": False,
"message": "",
"details": {
"accession_number": accession_number,
"process_start_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"steps_completed": []
}
}
try:
# STEP 1: Find study by accession number to get the Study UID
process_logger.info(f"Finding study with accession number: {accession_number}")
finder = DicomFinder()
study = finder.find_study_by_accession(accession_number)
if not study:
process_logger.error(f"Study not found for accession number: {accession_number}")
response_data["message"] = f"Study not found for accession number: {accession_number}"
return response_data
study_uid = study.StudyInstanceUID
patient_id = getattr(study, 'PatientID', '')
study_description = getattr(study, 'StudyDescription', '')
study_date = getattr(study, 'StudyDate', '')
study_time = getattr(study, 'StudyTime', '000000')
response_data["details"]["study_uid"] = study_uid
response_data["details"]["patient_id"] = patient_id
response_data["details"]["study_description"] = study_description
process_logger.info(f"Found study with UID: {study_uid} for accession number: {accession_number}")
response_data["details"]["steps_completed"].append("study_found")
# STEP 2: Retrieve the study using C-GET
process_logger.info(f"Retrieving study: {study_uid}")
retriever = DicomRetriever()
retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number)
if not retrieve_result['success']:
process_logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}")
cleanup_study_directory(study_uid) # Clean up any partial files
response_data["message"] = f"Failed to retrieve study: {retrieve_result['status']}"
return response_data
process_logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}")
response_data["details"]["instances_retrieved"] = retrieve_result['successful_instances']
response_data["details"]["steps_completed"].append("study_retrieved")
# STEP 3: Send the study to destination PACS using C-STORE
process_logger.info(f"Sending study {study_uid} to destination PACS")
sender = DicomSender()
send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid))
response_data["details"]["files_sent"] = send_result['successful_sends']
response_data["details"]["total_files"] = send_result['total_files']
if not send_result['success']:
process_logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}")
# We continue processing even if send fails, to log the details
response_data["details"]["c_store_success"] = False
response_data["details"]["c_store_error"] = send_result.get('error', 'Unknown error')
else:
process_logger.info(f"Successfully sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS")
response_data["details"]["c_store_success"] = True
response_data["details"]["steps_completed"].append("study_sent")
# STEP 4: Create detailed log for the study
process_logger.info(f"Creating detailed log for study: {study_uid}")
series_list = finder.find_series_for_study(study_uid, accession_number=accession_number)
study_datetime = f"{study_date}{study_time}"
study_log = {
'Study_IUID': study_uid,
'AccessionNumber': accession_number,
'PatientID': patient_id,
'StudyDescription': study_description,
'StudyDateTime': study_datetime,
'CstoreSuccess': send_result['success'],
'Series': []
}
for series in series_list:
series_uid = series.SeriesInstanceUID
# Get first instance for the series
instance = finder.find_first_instance_for_series(study_uid, series_uid)
# Helper function to safely get attributes
def safe_get_attr(obj, attr_name, default=''):
"""Get attribute value, ensuring None is converted to default."""
value = getattr(obj, attr_name, default)
return default if value is None else str(value)
series_info = {
'Series_IUID': safe_get_attr(series, 'SeriesInstanceUID'),
'SeriesNumber': safe_get_attr(series, 'SeriesNumber'),
'SeriesDescription': safe_get_attr(series, 'SeriesDescription'),
'NumberOfInstances': safe_get_attr(series, 'NumberOfSeriesRelatedInstances'),
'SOP_IUID': safe_get_attr(instance, 'SOPInstanceUID') if instance else ''
}
study_log['Series'].append(series_info)
response_data["details"]["series_count"] = len(study_log['Series'])
response_data["details"]["steps_completed"].append("log_created")
# STEP 5: Send study_log to HIS API
process_logger.info(f"Sending study data to HIS API for accession: {accession_number}")
his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}"
try:
headers = {
'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==',
'Content-Type': 'application/json'
}
response = requests.post(his_url, json=study_log, headers=headers)
# Add response data to the log
study_log['HisApiResponse'] = {
'StatusCode': response.status_code,
'ResponseText': response.text,
'Success': False,
'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
response_data["details"]["his_api_status_code"] = response.status_code
# Parse the response
try:
response_data_his = response.json() if response.content else {}
study_log['HisApiResponse']['ResponseData'] = response_data_his
if response.status_code == 200 and response_data_his.get('OK') == "1":
study_log['HisApiResponse']['Success'] = True
process_logger.info(f"Successfully sent JSON {accession_number} to HIS API")
response_data["details"]["his_integration_success"] = True
response_data["details"]["steps_completed"].append("his_api_success")
else:
error_msg = response_data_his.get('MSG', response_data_his.get('message', 'Unknown error'))
process_logger.error(f"Failed to send JSON for {accession_number} to HIS API: {error_msg}. Study_IUID: {study_uid}")
response_data["details"]["his_integration_success"] = False
response_data["details"]["his_error"] = error_msg
response_data["details"]["steps_completed"].append("his_api_failed")
except ValueError:
study_log['HisApiResponse']['ParseError'] = "Invalid JSON response"
process_logger.error(f"Failed to parse response for {accession_number}, invalid JSON response. Status code: {response.status_code}")
response_data["details"]["his_integration_success"] = False
response_data["details"]["his_error"] = "Invalid JSON response from HIS API"
response_data["details"]["steps_completed"].append("his_api_failed")
except Exception as e:
study_log['HisApiResponse'] = {
'Success': False,
'Error': str(e),
'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
process_logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}")
response_data["details"]["his_integration_success"] = False
response_data["details"]["his_error"] = str(e)
response_data["details"]["steps_completed"].append("his_api_error")
# STEP 6: Clean up DICOM files
cleanup_result = cleanup_study_directory(study_uid)
if cleanup_result:
process_logger.info(f"Successfully cleaned up DICOM files for study: {study_uid}")
response_data["details"]["cleanup_success"] = True
response_data["details"]["steps_completed"].append("cleanup_success")
else:
process_logger.warning(f"Failed to clean up DICOM files for study: {study_uid}")
response_data["details"]["cleanup_success"] = False
response_data["details"]["steps_completed"].append("cleanup_failed")
# STEP 7: Save the study log for reference
log_dir = settings.JSON_OUTPUT_DIR
create_directory_if_not_exists(log_dir)
log_filename = f"api_{accession_number}.json"
save_json_data(study_log, log_filename, log_dir)
process_logger.info(f"Saved study log to {os.path.join(log_dir, log_filename)}")
# Set final response based on C-STORE success (primary success metric)
response_data["success"] = send_result['success']
if send_result['success']:
his_success = response_data["details"].get("his_integration_success", False)
if his_success:
response_data["message"] = f"Berhasil migrasi file DICOM Accession Number: {accession_number}"
else:
his_error = response_data["details"].get("his_error", "Unknown HIS error")
response_data["message"] = f"Berhasil migrasi file DICOM tetapi gagal update HIS: {his_error}"
else:
response_data["message"] = f"Gagal migrasi file DICOM: {send_result.get('error', 'Unknown error')}"
process_logger.info(f"Completed processing study with accession number: {accession_number}")
response_data["details"]["process_end_time"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return response_data
except Exception as e:
process_logger.exception(f"Unexpected error processing study {accession_number}: {str(e)}")
# Try to clean up if we have a study_uid
if 'study_uid' in locals():
cleanup_study_directory(study_uid)
response_data["message"] = f"Error during migration: {str(e)}"
response_data["details"]["error"] = str(e)
response_data["details"]["process_end_time"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return response_data