Compare commits

4 Commits

Author SHA1 Message Date
mario
7da44a6b64 tested, add systemd deploy guide 2025-07-11 16:42:05 +07:00
mario
872fe6c33a add his error message response 2025-07-11 16:03:43 +07:00
mario
0cdd52db94 add http test | fix ctrl+c shutdown 2025-07-11 15:48:34 +07:00
mario
853cc70edb add: fastapi wrapped baru mau di tes di Gajah Mada 2025-07-11 15:01:27 +07:00
9 changed files with 819 additions and 1 deletions

180
api-app/README.md Normal file
View File

@@ -0,0 +1,180 @@
# DICOM Migration REST API
A simple REST API that triggers DICOM study migration for a specific Accession Number.
## How to Run
```bash
# Start the server on default port 8000
python run.py
# Custom port
python run.py --port 9000
# Custom token
python run.py --token your-secure-token
# Development mode with auto-reload
python run.py --reload
```
## How to Use
### Make a Request
```bash
# Using curl
curl -X POST "http://localhost:8000/migrate/R.20240401.0138" \
-H "Authorization: Bearer token-his2-untuk-hit-api-migrasi-clarity" \
-H "Content-Type: application/json"
```
### Response Success Example
```json
{
"success":true,
"message":"Berhasil migrasi file DICOM Accession Number: R.20240401.0138",
"details":
{
"accession_number":"R.20240401.0138",
"process_start_time":"2025-07-11 15:27:26",
"steps_completed":[
"study_found","study_retrieved","study_sent","log_created","his_api_success","cleanup_success"
],
"study_uid":"1.2.826.1.3680043.9.5282.150415.544835.202404010138",
"patient_id":"00544835",
"study_description":"A103 - Thorax PA",
"instances_retrieved":4,
"files_sent":2,
"total_files":2,
"c_store_success":true,
"series_count":4,
"his_api_status_code":200,
"his_integration_success":true,
"cleanup_success":true,
"process_end_time":"2025-07-11 15:27:46"
}
}
```
### Response Failed Example
Kalau sudah ada di tabel HIS:
```json
{
"success":true,
"message":"Berhasil migrasi file DICOM tetapi gagal update HIS: ERR: Accession No. R.20240401.0142 sudah ada ke table `pacs_order_mwl`!",
"details":{
"accession_number":"R.20240401.0142",
"process_start_time":"2025-07-11 16:05:10",
"steps_completed":[
"study_found","study_retrieved","study_sent","log_created","his_api_failed","cleanup_success"
],
"study_uid":"1.2.826.1.3680043.9.5282.150415.204342.202404010142",
"patient_id":"00204342",
"study_description":"A103 - Thorax PA",
"instances_retrieved":4,
"files_sent":2,
"total_files":2,
"c_store_success":true,
"series_count":4,
"his_api_status_code":406,
"his_integration_success":false,
"his_error":"ERR: Accession No. R.20240401.0142 sudah ada ke table `pacs_order_mwl`!","cleanup_success":true,
"process_end_time":"2025-07-11 16:05:12"
}
}
```
## Logs
- Main API logs: `logs/api.log`
- Process details: `logs/api_process.log`
## Simple Workflow
1. API receives request with accession number
2. Finds study UID using C-FIND
3. Downloads DICOM files using C-GET
4. Uploads files to destination PACS using C-STORE
5. Updates HIS system via API call
6. Cleans up all temporary DICOM files
7. Returns success/failure response
## Troubleshooting Tips
- Check PACS connectivity in `config/settings.py`
- Verify the accession number exists in source PACS
- Ensure permissions for log directories
- Check both log files for detailed error messages
- Verify HIS API connectivity for end-to-end success
## Quick Test
1. Start the server:
```bash
python run.py
```
2. In another terminal, test the root endpoint:
```bash
curl http://localhost:8000/
```
3. Test migration with a valid accession number:
```bash
curl -X POST "http://localhost:8000/migrate/R.20240401.0138" \
-H "Authorization: Bearer token-his2-untuk-hit-api-migrasi-clarity"
```
## Deploy Systemd Service
1. Create a service file:
```bash
sudo nano /etc/systemd/system/api-pydicom-migrasi.service
```
2. Add the following content:
```ini
[Unit]
Description=DICOM Migration API Service
After=network.target
[Service]
User=pacs
Group=pacs
WorkingDirectory=/home/pacs/pydicom-migrasi-clarity
Environment="PATH=/home/pacs/pydicom-migrasi-clarity/venv/bin"
Environment="API_TOKEN=$2y$05$o1Sfikmwynq76PmuBkJeROpS2WcD.Sh4lgrjohMicnlhBlGAt8UAq"
ExecStart=/home/pacs/pydicom-migrasi-clarity/venv/bin/python /home/pacs/pydicom-migrasi-clarity/api-app/run.py --host 0.0.0.0 --port 8000
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
```
3. Reload systemd and enable the service:
```bash
# Reload the systemd configuration
sudo systemctl daemon-reload
# Enable the service to start on boot
sudo systemctl enable api-pydicom-migrasi.service
# Start the service
sudo systemctl start api-pydicom-migrasi.service
# Check the status
sudo systemctl status api-pydicom-migrasi.service
```
4. Monitor logs:
```bash
# View all logs
sudo journalctl -u api-pydicom-migrasi.service
# Follow logs in real-time
sudo journalctl -u api-pydicom-migrasi.service -f
```

25
api-app/api_cleanup.py Normal file
View File

@@ -0,0 +1,25 @@
"""
Custom cleanup utilities for the API application.
This module provides customized cleanup functions that are more compatible with FastAPI.
"""
import os
import sys
import shutil
import atexit
from config import settings
from utils.logger import setup_logger
from utils.cleanup import _cleanup_dirs, cleanup_dicom_files
# Create API cleanup logger
cleanup_logger = setup_logger("api_cleanup", "logs/api_cleanup.log")
def register_api_cleanup():
"""
Register cleanup handlers for the API application.
This avoids registering signal handlers which can conflict with FastAPI.
"""
# Only register the atexit handler, not signal handlers
atexit.register(cleanup_dicom_files)
cleanup_logger.info("Registered API cleanup handler for exit")
return True

134
api-app/main.py Normal file
View File

@@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""
FastAPI application for DICOM migration service.
"""
import sys
import os
import json
from typing import Dict, Any, Optional
from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
# Add parent directory to sys.path to allow importing from the main project
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import the process module for API-specific processing
from process import process_study_by_accession
from utils.logger import setup_logger
from utils.dicom_utils import create_directory_if_not_exists
from api_cleanup import register_api_cleanup
# Create API logger
api_logger = setup_logger("api", "logs/api.log")
# Initialize FastAPI app
app = FastAPI(
title="DICOM Migration API",
description="API for migrating DICOM studies by accession number",
version="1.0.0"
)
# Security scheme for Bearer token authentication
security = HTTPBearer()
# Environment variable for API key (use a secure method in production)
API_TOKEN = os.environ.get("API_TOKEN", "$2y$05$o1Sfikmwynq76PmuBkJeROpS2WcD.Sh4lgrjohMicnlhBlGAt8UAq")
class MigrationResponse(BaseModel):
"""Response model for migration endpoints."""
success: bool
message: str
details: Optional[Dict[str, Any]] = None
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""
Verify the authentication token.
Args:
credentials: Bearer token credentials
Returns:
bool: True if token is valid
Raises:
HTTPException: If token is invalid
"""
if credentials.scheme != "Bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication scheme. Bearer token required."
)
if credentials.credentials != API_TOKEN:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token or expired token"
)
return True
@app.on_event("startup")
async def startup_event():
"""Initialize resources when the API starts."""
# Create necessary directories
create_directory_if_not_exists("logs")
# Use our custom API cleanup registration that avoids signal handlers
register_api_cleanup()
api_logger.info("API started successfully")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up resources when the API shuts down."""
api_logger.info("API shutting down, cleaning up resources...")
# Perform cleanup operations here
from utils.cleanup import cleanup_dicom_files
cleanup_dicom_files()
api_logger.info("Cleanup complete, API shutdown successful")
@app.get("/", response_model=MigrationResponse)
async def root():
"""
Root endpoint to check if the API is running.
"""
return {
"success": True,
"message": "DICOM Migration API is running",
"details": {
"version": "1.0.0",
"copyright": "PT. Sadhana Abiyasa Sampoerna",
"developer": "Mario and FX Padmanto"
}
}
@app.post("/migrate/{accession_number}", response_model=MigrationResponse)
async def migrate_study(
accession_number: str,
_: bool = Depends(verify_token)
):
"""
Migrate a single study by its accession number.
Args:
accession_number: The accession number of the study to migrate
Returns:
MigrationResponse: The result of the migration operation
"""
api_logger.info(f"Received migration request for accession number: {accession_number}")
# Process the study using the specialized API processing module
result = process_study_by_accession(accession_number)
if result['success']:
api_logger.info(f"Successfully processed migration for accession number: {accession_number}")
else:
api_logger.error(f"Failed to process migration for accession number: {accession_number}: {result['message']}")
return result
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

284
api-app/process.py Normal file
View File

@@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""
Process module for DICOM migration API service.
This module contains the core processing logic for migrating DICOM studies through the API.
"""
import os
import sys
import json
import shutil
from datetime import datetime
import requests
# Add parent directory to sys.path to allow importing from the main project
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import settings
from services.dicom_finder import DicomFinder
from services.dicom_retriever import DicomRetriever
from services.dicom_sender import DicomSender
from utils.logger import setup_logger
from utils.dicom_utils import save_json_data, create_directory_if_not_exists
# Create API process logger
process_logger = setup_logger("api_process", "logs/api_process.log")
def cleanup_study_directory(study_uid):
"""
Explicitly clean up the DICOM files for a study after processing.
Args:
study_uid: Study Instance UID to clean up
Returns:
bool: True if cleanup was successful, False otherwise
"""
study_dir = os.path.join(settings.DICOM_STORE_DIR, study_uid)
process_logger.info(f"Cleaning up DICOM files for study: {study_uid} at {study_dir}")
try:
if os.path.exists(study_dir):
shutil.rmtree(study_dir)
process_logger.info(f"Successfully removed study directory: {study_dir}")
return True
else:
process_logger.warning(f"Study directory does not exist: {study_dir}")
return True # Return True since there's nothing to clean
except Exception as e:
process_logger.error(f"Error cleaning up study directory {study_dir}: {str(e)}")
return False
def process_study_by_accession(accession_number):
"""
Process a study by its accession number for the API service.
This function:
1. Finds the study UID based on accession number
2. Retrieves DICOM files using C-GET
3. Sends DICOM files to destination PACS using C-STORE
4. Sends study information to HIS API
5. Cleans up temporary DICOM files
Args:
accession_number: The accession number of the study to process
Returns:
dict: Result with success status, message, and details
"""
process_logger.info(f"Processing study with accession number: {accession_number}")
# Initialize response data
response_data = {
"success": False,
"message": "",
"details": {
"accession_number": accession_number,
"process_start_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"steps_completed": []
}
}
try:
# STEP 1: Find study by accession number to get the Study UID
process_logger.info(f"Finding study with accession number: {accession_number}")
finder = DicomFinder()
study = finder.find_study_by_accession(accession_number)
if not study:
process_logger.error(f"Study not found for accession number: {accession_number}")
response_data["message"] = f"Study not found for accession number: {accession_number}"
return response_data
study_uid = study.StudyInstanceUID
patient_id = getattr(study, 'PatientID', '')
study_description = getattr(study, 'StudyDescription', '')
study_date = getattr(study, 'StudyDate', '')
study_time = getattr(study, 'StudyTime', '000000')
response_data["details"]["study_uid"] = study_uid
response_data["details"]["patient_id"] = patient_id
response_data["details"]["study_description"] = study_description
process_logger.info(f"Found study with UID: {study_uid} for accession number: {accession_number}")
response_data["details"]["steps_completed"].append("study_found")
# STEP 2: Retrieve the study using C-GET
process_logger.info(f"Retrieving study: {study_uid}")
retriever = DicomRetriever()
retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number)
if not retrieve_result['success']:
process_logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}")
cleanup_study_directory(study_uid) # Clean up any partial files
response_data["message"] = f"Failed to retrieve study: {retrieve_result['status']}"
return response_data
process_logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}")
response_data["details"]["instances_retrieved"] = retrieve_result['successful_instances']
response_data["details"]["steps_completed"].append("study_retrieved")
# STEP 3: Send the study to destination PACS using C-STORE
process_logger.info(f"Sending study {study_uid} to destination PACS")
sender = DicomSender()
send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid))
response_data["details"]["files_sent"] = send_result['successful_sends']
response_data["details"]["total_files"] = send_result['total_files']
if not send_result['success']:
process_logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}")
# We continue processing even if send fails, to log the details
response_data["details"]["c_store_success"] = False
response_data["details"]["c_store_error"] = send_result.get('error', 'Unknown error')
else:
process_logger.info(f"Successfully sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS")
response_data["details"]["c_store_success"] = True
response_data["details"]["steps_completed"].append("study_sent")
# STEP 4: Create detailed log for the study
process_logger.info(f"Creating detailed log for study: {study_uid}")
series_list = finder.find_series_for_study(study_uid, accession_number=accession_number)
study_datetime = f"{study_date}{study_time}"
study_log = {
'Study_IUID': study_uid,
'AccessionNumber': accession_number,
'PatientID': patient_id,
'StudyDescription': study_description,
'StudyDateTime': study_datetime,
'CstoreSuccess': send_result['success'],
'Series': []
}
for series in series_list:
series_uid = series.SeriesInstanceUID
# Get first instance for the series
instance = finder.find_first_instance_for_series(study_uid, series_uid)
# Helper function to safely get attributes
def safe_get_attr(obj, attr_name, default=''):
"""Get attribute value, ensuring None is converted to default."""
value = getattr(obj, attr_name, default)
return default if value is None else str(value)
series_info = {
'Series_IUID': safe_get_attr(series, 'SeriesInstanceUID'),
'SeriesNumber': safe_get_attr(series, 'SeriesNumber'),
'SeriesDescription': safe_get_attr(series, 'SeriesDescription'),
'NumberOfInstances': safe_get_attr(series, 'NumberOfSeriesRelatedInstances'),
'SOP_IUID': safe_get_attr(instance, 'SOPInstanceUID') if instance else ''
}
study_log['Series'].append(series_info)
response_data["details"]["series_count"] = len(study_log['Series'])
response_data["details"]["steps_completed"].append("log_created")
# STEP 5: Send study_log to HIS API
process_logger.info(f"Sending study data to HIS API for accession: {accession_number}")
his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}"
try:
headers = {
'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==',
'Content-Type': 'application/json'
}
response = requests.post(his_url, json=study_log, headers=headers)
# Add response data to the log
study_log['HisApiResponse'] = {
'StatusCode': response.status_code,
'ResponseText': response.text,
'Success': False,
'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
response_data["details"]["his_api_status_code"] = response.status_code
# Parse the response
try:
response_data_his = response.json() if response.content else {}
study_log['HisApiResponse']['ResponseData'] = response_data_his
if response.status_code == 200 and response_data_his.get('OK') == "1":
study_log['HisApiResponse']['Success'] = True
process_logger.info(f"Successfully sent JSON {accession_number} to HIS API")
response_data["details"]["his_integration_success"] = True
response_data["details"]["steps_completed"].append("his_api_success")
else:
error_msg = response_data_his.get('MSG', response_data_his.get('message', 'Unknown error'))
process_logger.error(f"Failed to send JSON for {accession_number} to HIS API: {error_msg}. Study_IUID: {study_uid}")
response_data["details"]["his_integration_success"] = False
response_data["details"]["his_error"] = error_msg
response_data["details"]["steps_completed"].append("his_api_failed")
except ValueError:
study_log['HisApiResponse']['ParseError'] = "Invalid JSON response"
process_logger.error(f"Failed to parse response for {accession_number}, invalid JSON response. Status code: {response.status_code}")
response_data["details"]["his_integration_success"] = False
response_data["details"]["his_error"] = "Invalid JSON response from HIS API"
response_data["details"]["steps_completed"].append("his_api_failed")
except Exception as e:
study_log['HisApiResponse'] = {
'Success': False,
'Error': str(e),
'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
process_logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}")
response_data["details"]["his_integration_success"] = False
response_data["details"]["his_error"] = str(e)
response_data["details"]["steps_completed"].append("his_api_error")
# STEP 6: Clean up DICOM files
cleanup_result = cleanup_study_directory(study_uid)
if cleanup_result:
process_logger.info(f"Successfully cleaned up DICOM files for study: {study_uid}")
response_data["details"]["cleanup_success"] = True
response_data["details"]["steps_completed"].append("cleanup_success")
else:
process_logger.warning(f"Failed to clean up DICOM files for study: {study_uid}")
response_data["details"]["cleanup_success"] = False
response_data["details"]["steps_completed"].append("cleanup_failed")
# STEP 7: Save the study log for reference
log_dir = settings.JSON_OUTPUT_DIR
create_directory_if_not_exists(log_dir)
log_filename = f"api_{accession_number}.json"
save_json_data(study_log, log_filename, log_dir)
process_logger.info(f"Saved study log to {os.path.join(log_dir, log_filename)}")
# Set final response based on C-STORE success (primary success metric)
response_data["success"] = send_result['success']
if send_result['success']:
his_success = response_data["details"].get("his_integration_success", False)
if his_success:
response_data["message"] = f"Berhasil migrasi file DICOM Accession Number: {accession_number}"
else:
his_error = response_data["details"].get("his_error", "Unknown HIS error")
response_data["message"] = f"Berhasil migrasi file DICOM tetapi gagal update HIS: {his_error}"
else:
response_data["message"] = f"Gagal migrasi file DICOM: {send_result.get('error', 'Unknown error')}"
process_logger.info(f"Completed processing study with accession number: {accession_number}")
response_data["details"]["process_end_time"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return response_data
except Exception as e:
process_logger.exception(f"Unexpected error processing study {accession_number}: {str(e)}")
# Try to clean up if we have a study_uid
if 'study_uid' in locals():
cleanup_study_directory(study_uid)
response_data["message"] = f"Error during migration: {str(e)}"
response_data["details"]["error"] = str(e)
response_data["details"]["process_end_time"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return response_data

9
api-app/requirements.txt Normal file
View File

@@ -0,0 +1,9 @@
fastapi>=0.95.0
uvicorn>=0.22.0
pydicom>=2.3.0
pynetdicom>=2.0.0
python-dateutil>=2.8.2
retry==0.9.2
requests>=2.28.0
python-jose>=3.3.0
python-multipart>=0.0.6

52
api-app/run.py Normal file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""
Script to run the FastAPI server for DICOM migration service.
"""
import os
import sys
import signal
import uvicorn
import argparse
def parse_args():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description='DICOM Migration API Server')
parser.add_argument('--host', default='0.0.0.0', help='Host to bind to')
parser.add_argument('--port', type=int, default=8000, help='Port to bind to')
parser.add_argument('--token', help='API token for authentication (overrides environment variable)')
parser.add_argument('--reload', action='store_true', help='Enable auto-reload for development')
return parser.parse_args()
def handle_exit(signum, frame):
"""Handle exit signals gracefully."""
print(f"Received signal {signum}, shutting down gracefully...")
# Let uvicorn handle the exit
sys.exit(0)
def main():
"""Main function to run the API server."""
args = parse_args()
# Set API token if provided
if args.token:
os.environ['API_TOKEN'] = args.token
# Check if API_TOKEN is set
if not os.environ.get('API_TOKEN'):
print("WARNING: API_TOKEN not set. Using default token '$2y$05$o1Sfikmwynq76PmuBkJeROpS2WcD.Sh4lgrjohMicnlhBlGAt8UAq'.")
print("Set environment variable API_TOKEN or use --token argument for better security.")
# Set up signal handlers for graceful shutdown
signal.signal(signal.SIGINT, handle_exit)
signal.signal(signal.SIGTERM, handle_exit)
print(f"Starting DICOM Migration API server on {args.host}:{args.port}")
uvicorn.run(
"main:app",
host=args.host,
port=args.port,
reload=args.reload
)
if __name__ == "__main__":
main()

17
api-app/test.http Normal file
View File

@@ -0,0 +1,17 @@
@host = http://localhost:8000
@token = $2y$05$o1Sfikmwynq76PmuBkJeROpS2WcD.Sh4lgrjohMicnlhBlGAt8UAq
### Healtcheck
GET {{host}}/
###
POST {{host}}/migrate/R.20240401.0138
Authorization: Bearer {{token}}
Content-Type: application/json
####
POST {{host}}/migrate/R.20240401.0142
Authorization: Bearer {{token}}
Content-Type: application/json

View File

@@ -347,4 +347,82 @@ class DicomFinder:
logger.error(error_msg)
raise DicomQueryError(error_msg)
return study
@dicom_retry(exception_types=(DicomQueryError, ConnectionError))
def find_study_by_accession(self, accession_number):
"""
Find a specific study by its AccessionNumber.
Args:
accession_number (str): AccessionNumber to find
Returns:
Dataset: Study dataset or None if not found
"""
logger.info(f"Finding study with AccessionNumber: {accession_number}")
# Create query dataset
ds = Dataset()
ds.QueryRetrieveLevel = 'STUDY'
# Set the AccessionNumber filter
ds.AccessionNumber = accession_number
# Required fields (minimal set)
ds.StudyInstanceUID = ''
ds.StudyDate = ''
# Additional fields we want to retrieve
ds.PatientID = '' # MedrecID
ds.StudyDescription = ''
ds.StudyTime = ''
ds.NumberOfStudyRelatedSeries = ''
# Create association
assoc = self.ae.associate(
self.pacs_config['host'],
self.pacs_config['port'],
ae_title=self.pacs_config['aet']
)
study = None
if assoc.is_established:
try:
logger.debug("Association established, sending C-FIND request")
# Send C-FIND request
responses = assoc.send_c_find(
ds,
StudyRootQueryRetrieveInformationModelFind
)
# Process responses - just get the first one
for (status, dataset) in responses:
if status and status.Status == 0xFF00: # Pending
if dataset:
study = dataset
logger.debug(f"Found study: {dataset.StudyInstanceUID} for AccessionNumber: {accession_number}")
break
elif status and status.Status != 0x0000: # Not success
logger.error(f"C-FIND error: {status}")
if study:
logger.info(f"Found study with AccessionNumber {accession_number}, StudyUID: {study.StudyInstanceUID}")
else:
logger.warning(f"No study found with AccessionNumber {accession_number}")
except Exception as e:
logger.error(f"Error during C-FIND by AccessionNumber: {str(e)}")
raise DicomQueryError(f"C-FIND operation failed: {str(e)}")
finally:
# Release the association
assoc.release()
logger.debug("Association released")
else:
error_msg = f"Association rejected, aborted or never connected to {self.pacs_config['aet']}"
logger.error(error_msg)
raise DicomQueryError(error_msg)
return study

View File

@@ -2,6 +2,7 @@
DICOM file cleanup utility to ensure temporary DICOM files are removed even on script termination.
"""
import os
import sys
import shutil
import atexit
import signal
@@ -42,6 +43,36 @@ def cleanup_dicom_files():
# Clear the registry after cleanup
_cleanup_dirs.clear()
def cleanup_study_directory(study_uid):
"""
Immediately remove the directory for a specific study.
Args:
study_uid (str): Study Instance UID of the study to clean up
Returns:
bool: True if cleanup was successful, False otherwise
"""
study_dir = os.path.join(settings.DICOM_STORE_DIR, study_uid)
if not os.path.exists(study_dir):
logger.debug(f"Cleanup: Study directory does not exist: {study_dir}")
return True
try:
shutil.rmtree(study_dir)
logger.info(f"Cleanup: Removed DICOM files from {study_dir}")
# Also remove from the registry if it's there
global _cleanup_dirs
if study_dir in _cleanup_dirs:
_cleanup_dirs.remove(study_dir)
return True
except Exception as e:
logger.error(f"Cleanup: Failed to remove DICOM files from {study_dir}: {str(e)}")
return False
def register_exit_handlers():
"""
Register cleanup handlers for various exit scenarios.
@@ -70,4 +101,12 @@ def signal_handler(sig, frame):
logger.info(f"Received {signal_name}, cleaning up...")
cleanup_dicom_files()
exit(0)
# Don't call exit() directly in a web application context
# Let the calling application handle the exit process
# This fix makes it compatible with FastAPI and other web frameworks
# Raise SystemExit instead of calling exit() directly
if hasattr(frame, '_is_web_app') and frame._is_web_app:
return
else:
# For command-line applications, we can exit directly
sys.exit(0)