Files
pydicom-google-uploader/main.py
2025-04-22 19:13:49 +07:00

295 lines
11 KiB
Python

import config # config.py
import os
import time
import logging
from logging.handlers import RotatingFileHandler
import json
import requests
import tempfile
from pynetdicom import AE, StoragePresentationContexts, evt, build_role
from pydicom.dataset import Dataset
from pydicom import dcmread
import uuid
from datetime import datetime, timedelta
from pynetdicom.sop_class import PatientRootQueryRetrieveInformationModelGet
# Configure logging
logger = logging.getLogger('DicomUploader')
logger.setLevel(getattr(logging, config.LOG_LEVEL))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S')
# Add console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Add rotating file handler
file_handler = RotatingFileHandler(
config.LOG_FILE,
maxBytes=config.LOG_MAX_SIZE,
backupCount=config.LOG_BACKUP_COUNT,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Prevent propagation to root logger
logger.propagate = False
class DicomUploader:
def __init__(self, pacs_host, pacs_port, pacs_ae_title, local_ae_title, proxy_url):
"""
Initialize the DICOM uploader
Args:
pacs_host: Hostname/IP of the PACS server
pacs_port: Port number of the PACS server
pacs_ae_title: AE Title of the PACS server
local_ae_title: Local AE Title to use for connection
proxy_url: URL of the Go OHIF Proxy
"""
self.pacs_host = pacs_host
self.pacs_port = pacs_port
self.pacs_ae_title = pacs_ae_title
self.local_ae_title = local_ae_title
self.proxy_url = proxy_url
self.temp_dir = tempfile.mkdtemp()
logger.info(f"Created temporary directory: {self.temp_dir}")
def _get_pending_orders(self):
"""
Fetch pending orders from the API
Uses startDate, endDate, and status parameters to filter results
Status 0 indicates pending orders
"""
# Using the corrected endpoint
url = "https://devone.aplikasi.web.id/one-api/mockup/godicomupreq/godicomupreq/get_uprequests"
# Add required query parameters
today = datetime.now()
yesterday = today - timedelta(days=1)
params = {
'startDate': yesterday.strftime('%Y-%m-%d'),
'endDate': today.strftime('%Y-%m-%d'),
'status': '0' # Status 0 for pending orders
}
try:
response = requests.get(url, params=params)
response.raise_for_status()
response_data = response.json()
# Parse the nested JSON structure
if response_data.get("status") == "OK" and "data" in response_data:
return response_data["data"]
else:
logger.warning("API returned a response without data or with non-OK status")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch pending orders: {e}")
return []
def _retrieve_dicom(self, study_instance_uid, series_instance_uid=None, sop_instance_uid=None):
"""
Retrieve DICOM files from PACS using C-GET
Returns:
List of paths to retrieved DICOM files
"""
ae = AE(ae_title=self.local_ae_title)
ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
storage_uids = [
'1.2.840.10008.5.1.4.1.1.1', # CR Storage
'1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage
'1.2.840.10008.5.1.4.1.1.2', # CT Image Storage
'1.2.840.10008.5.1.4.1.1.4', # MR Image Storage
'1.2.840.10008.5.1.4.1.1.7', # Secondary Capture Image Storage
'1.2.840.10008.5.1.4.1.1.6.1', # Ultrasound Image Storage
'1.2.840.10008.5.1.4.1.1.128', # PET Image Storage
'1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine Image Storage
'1.2.840.10008.5.1.4.1.1.9.1.1', # 12-lead ECG Waveform Storage
'1.2.840.10008.5.1.4.1.1.9.1.2', # General ECG Waveform Storage
]
# * == Kalau error terkait context ketika C-STORE RSP cek disini
ext_neg = []
for uid in storage_uids:
ae.add_requested_context(uid)
role = build_role(uid, scp_role=True)
ext_neg.append(role)
retrieved_files = []
def handle_store(event):
"""Handle a C-STORE request from the peer"""
dataset = event.dataset
# Save the dataset to a temporary file
filename = f"{dataset.SOPInstanceUID}.dcm"
filepath = os.path.join(self.temp_dir, filename)
dataset.save_as(filepath)
retrieved_files.append(filepath)
return 0x0000 # Success
handlers = [(evt.EVT_C_STORE, handle_store)]
try:
assoc = ae.associate(
self.pacs_host,
self.pacs_port,
ae_title=self.pacs_ae_title,
ext_neg=ext_neg, # No extended negotiation
evt_handlers=handlers
)
if assoc.is_established:
ds = Dataset()
if sop_instance_uid:
ds.QueryRetrieveLevel = 'IMAGE'
ds.StudyInstanceUID = study_instance_uid
ds.SeriesInstanceUID = series_instance_uid
ds.SOPInstanceUID = sop_instance_uid
elif series_instance_uid:
ds.QueryRetrieveLevel = 'SERIES'
ds.StudyInstanceUID = study_instance_uid
ds.SeriesInstanceUID = series_instance_uid
else:
ds.QueryRetrieveLevel = 'STUDY'
ds.StudyInstanceUID = study_instance_uid
# Send C-GET request and collect responses
responses = assoc.send_c_get(ds, PatientRootQueryRetrieveInformationModelGet)
for status, identifier in responses:
if status:
status_int = getattr(status, 'Status', 0)
logger.info(f"C-GET status: 0x{status_int:04x}")
# Check for specific status codes
if status_int == 0xa702: # Failed SOP Instance
if identifier and hasattr(identifier, 'FailedSOPInstanceUIDList'):
logger.error(f"Failed to retrieve: {identifier.FailedSOPInstanceUIDList}")
assoc.release()
else:
logger.error("Association with PACS failed")
except Exception as e:
logger.error(f"Error during DICOM retrieval: {e}")
return retrieved_files
def _upload_dicom_to_cloud(self, filepath):
"""Upload a DICOM file to Google Healthcare API via Go OHIF Proxy"""
proxy_url = f"{self.proxy_url}/dicomWeb/studies"
boundary = f"bd{uuid.uuid4()}"
try:
with open(filepath, 'rb') as dicom_file:
dicom_content = dicom_file.read()
headers = {
'Content-Type': f'multipart/related; type="application/dicom"; boundary="{boundary}"',
'Accept': '*/*',
'Origin': self.proxy_url,
}
# Format payload with boundary
payload = f'--{boundary}\r\n'
payload += 'Content-Type: application/dicom\r\n\r\n'
# Combine payload with binary data and closing boundary
data = payload.encode('utf-8') + dicom_content + f'\r\n--{boundary}--'.encode('utf-8')
response = requests.post(proxy_url, headers=headers, data=data)
response.raise_for_status()
logger.info(f"Successfully uploaded {filepath}")
return True
except Exception as e:
logger.error(f"Failed to upload {filepath}: {e}")
return False
def _update_status(self, order_id, status):
"""Update order status in database (placeholder)"""
logger.info(f"Updated order {order_id} status to {status}")
# Implement actual status update logic here
def process_pending_orders(self):
"""Process all pending orders"""
orders = self._get_pending_orders()
logger.info(f"Found {len(orders)} pending orders")
for order in orders:
try:
order_id = order.get('GdcUpreqID')
study_uid = order.get('GdcUpreq_StudyIUID')
series_uid = order.get('GdcUpreq_SeriesIUID')
sop_uid = order.get('GdcUpreq_SopIUID')
logger.info(f"Processing order {order_id} for study {study_uid}")
# Retrieve DICOM files
dicom_files = self._retrieve_dicom(study_uid, series_uid, sop_uid)
logger.info(f"Retrieved {len(dicom_files)} DICOM files")
success_count = 0
failure_count = 0
# Upload each file
for filepath in dicom_files:
if self._upload_dicom_to_cloud(filepath):
success_count += 1
else:
failure_count += 1
# Clean up file after upload
os.remove(filepath)
# Update status
if failure_count == 0 and success_count > 0:
self._update_status(order_id, "COMPLETED")
elif success_count > 0:
self._update_status(order_id, "PARTIALLY_COMPLETED")
else:
self._update_status(order_id, "FAILED")
except Exception as e:
logger.error(f"Error processing order: {e}")
self._update_status(order.get('GdcUpreqID'), "FAILED")
def cleanup(self):
"""Clean up temporary files"""
try:
os.rmdir(self.temp_dir)
logger.info(f"Removed temporary directory: {self.temp_dir}")
except OSError:
logger.warning(f"Could not remove temporary directory: {self.temp_dir}. It may not be empty.")
def main():
uploader = DicomUploader(
pacs_host=config.PACS_HOST,
pacs_port=config.PACS_PORT,
pacs_ae_title=config.PACS_AE_TITLE,
local_ae_title=config.LOCAL_AE_TITLE,
proxy_url=config.PROXY_URL
)
try:
uploader.process_pending_orders()
finally:
uploader.cleanup()
logger.info("Finished processing orders")
exit(0)
if __name__ == "__main__":
main()