342 lines
13 KiB
Python
342 lines
13 KiB
Python
import config # config.py
|
|
|
|
import os
|
|
import time
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
import json
|
|
import requests
|
|
import tempfile
|
|
from pynetdicom import AE, StoragePresentationContexts, evt, build_role
|
|
from pydicom.dataset import Dataset
|
|
from pydicom import dcmread
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
from pynetdicom.sop_class import PatientRootQueryRetrieveInformationModelGet
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger('DicomUploader')
|
|
logger.setLevel(getattr(logging, config.LOG_LEVEL))
|
|
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S')
|
|
|
|
# Add console handler
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(formatter)
|
|
logger.addHandler(console_handler)
|
|
|
|
# Add rotating file handler
|
|
file_handler = RotatingFileHandler(
|
|
config.LOG_FILE,
|
|
maxBytes=config.LOG_MAX_SIZE,
|
|
backupCount=config.LOG_BACKUP_COUNT,
|
|
encoding='utf-8'
|
|
)
|
|
file_handler.setFormatter(formatter)
|
|
logger.addHandler(file_handler)
|
|
|
|
# Prevent propagation to root logger
|
|
logger.propagate = False
|
|
|
|
class DicomUploader:
|
|
def __init__(self, pacs_host, pacs_port, pacs_ae_title, local_ae_title, proxy_url):
|
|
"""
|
|
Initialize the DICOM uploader
|
|
|
|
Args:
|
|
pacs_host: Hostname/IP of the PACS server
|
|
pacs_port: Port number of the PACS server
|
|
pacs_ae_title: AE Title of the PACS server
|
|
local_ae_title: Local AE Title to use for connection
|
|
proxy_url: URL of the Go OHIF Proxy
|
|
"""
|
|
self.pacs_host = pacs_host
|
|
self.pacs_port = pacs_port
|
|
self.pacs_ae_title = pacs_ae_title
|
|
self.local_ae_title = local_ae_title
|
|
self.proxy_url = proxy_url
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
logger.info(f"Created temporary directory: {self.temp_dir}")
|
|
|
|
def _get_pending_orders(self):
|
|
"""
|
|
Fetch pending orders from the API
|
|
|
|
Uses startDate, endDate, and status parameters to filter results
|
|
Status 0 indicates pending orders
|
|
"""
|
|
# Using the corrected endpoint
|
|
url = f"{config.API_URL}/get_uprequests"
|
|
|
|
# Add required query parameters
|
|
today = datetime.now()
|
|
yesterday = today - timedelta(days=1)
|
|
|
|
params = {
|
|
'startDate': yesterday.strftime('%Y-%m-%d'),
|
|
'endDate': today.strftime('%Y-%m-%d'),
|
|
'status': '0' # Status 0 for pending orders
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
response_data = response.json()
|
|
|
|
# Parse the nested JSON structure
|
|
if response_data.get("status") == "OK" and "data" in response_data:
|
|
return response_data["data"]
|
|
else:
|
|
logger.warning("API returned a response without data or with non-OK status")
|
|
return []
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to fetch pending orders: {e}")
|
|
return []
|
|
|
|
def _retrieve_dicom(self, study_instance_uid, series_instance_uid=None, sop_instance_uid=None):
|
|
"""
|
|
Retrieve DICOM files from PACS using C-GET
|
|
|
|
Returns:
|
|
List of paths to retrieved DICOM files
|
|
"""
|
|
ae = AE(ae_title=self.local_ae_title)
|
|
ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
|
|
|
|
# ae.maximum_pdu_size = 130816 # * Kalau ingin set maximum PDU size to 128kB. Harus set di sisi DCM4CHEE juga
|
|
|
|
storage_uids = [
|
|
'1.2.840.10008.5.1.4.1.1.1', # CR Storage
|
|
'1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage
|
|
'1.2.840.10008.5.1.4.1.1.2', # CT Image Storage
|
|
'1.2.840.10008.5.1.4.1.1.4', # MR Image Storage
|
|
'1.2.840.10008.5.1.4.1.1.7', # Secondary Capture Image Storage
|
|
'1.2.840.10008.5.1.4.1.1.6.1', # Ultrasound Image Storage
|
|
'1.2.840.10008.5.1.4.1.1.128', # PET Image Storage
|
|
'1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine Image Storage
|
|
'1.2.840.10008.5.1.4.1.1.9.1.1', # 12-lead ECG Waveform Storage
|
|
'1.2.840.10008.5.1.4.1.1.9.1.2', # General ECG Waveform Storage
|
|
]
|
|
|
|
# * == Kalau error terkait context ketika C-STORE RSP cek disini
|
|
ext_neg = []
|
|
for uid in storage_uids:
|
|
ae.add_requested_context(uid)
|
|
role = build_role(uid, scp_role=True)
|
|
ext_neg.append(role)
|
|
|
|
retrieved_files = []
|
|
|
|
def handle_store(event):
|
|
"""Handle a C-STORE request from the peer"""
|
|
dataset = event.dataset
|
|
|
|
# Save the dataset to a temporary file
|
|
filename = f"{dataset.SOPInstanceUID}.dcm"
|
|
filepath = os.path.join(self.temp_dir, filename)
|
|
dataset.save_as(filepath)
|
|
|
|
retrieved_files.append(filepath)
|
|
|
|
return 0x0000 # Success
|
|
|
|
handlers = [(evt.EVT_C_STORE, handle_store)]
|
|
|
|
try:
|
|
assoc = ae.associate(
|
|
self.pacs_host,
|
|
self.pacs_port,
|
|
ae_title=self.pacs_ae_title,
|
|
ext_neg=ext_neg, # No extended negotiation
|
|
evt_handlers=handlers
|
|
)
|
|
|
|
if assoc.is_established:
|
|
ds = Dataset()
|
|
|
|
# * Kalau level image nanti cuma 1 image aja yang digetscu
|
|
# if sop_instance_uid:
|
|
# ds.QueryRetrieveLevel = 'IMAGE'
|
|
# ds.StudyInstanceUID = study_instance_uid
|
|
# ds.SeriesInstanceUID = series_instance_uid
|
|
# ds.SOPInstanceUID = sop_instance_uid
|
|
|
|
# * Kalau level series nanti cget semua image dalam 1 series id itu
|
|
# elif series_instance_uid:
|
|
# ds.QueryRetrieveLevel = 'SERIES'
|
|
# ds.StudyInstanceUID = study_instance_uid
|
|
# ds.SeriesInstanceUID = series_instance_uid
|
|
|
|
# * Kalau level study nanti cget semua image dalam 1 study id itu
|
|
# else:
|
|
# ds.QueryRetrieveLevel = 'STUDY'
|
|
# ds.StudyInstanceUID = study_instance_uid
|
|
|
|
# * Default: Study level. Sampai ada kasus yang membutuhkan level lain
|
|
ds.QueryRetrieveLevel = 'STUDY'
|
|
ds.StudyInstanceUID = study_instance_uid
|
|
|
|
# Send C-GET request and collect responses
|
|
responses = assoc.send_c_get(ds, PatientRootQueryRetrieveInformationModelGet)
|
|
|
|
logger.info(f"Query Retrieval Level: {ds.QueryRetrieveLevel}")
|
|
|
|
for status, identifier in responses:
|
|
if status:
|
|
status_int = getattr(status, 'Status', 0)
|
|
logger.info(f"C-GET status: 0x{status_int:04x}")
|
|
|
|
# Check for specific status codes
|
|
if status_int == 0xa702: # Failed SOP Instance
|
|
if identifier and hasattr(identifier, 'FailedSOPInstanceUIDList'):
|
|
logger.error(f"Failed to retrieve: {identifier.FailedSOPInstanceUIDList}")
|
|
|
|
assoc.release()
|
|
else:
|
|
logger.error("Association with PACS failed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during DICOM retrieval: {e}")
|
|
|
|
return retrieved_files
|
|
|
|
def _upload_dicom_to_cloud(self, filepath):
|
|
"""Upload a DICOM file to Google Healthcare API via Go OHIF Proxy"""
|
|
proxy_url = f"{self.proxy_url}/dicomWeb/studies"
|
|
boundary = f"bd{uuid.uuid4()}"
|
|
|
|
try:
|
|
with open(filepath, 'rb') as dicom_file:
|
|
dicom_content = dicom_file.read()
|
|
|
|
headers = {
|
|
'Content-Type': f'multipart/related; type="application/dicom"; boundary="{boundary}"',
|
|
'Accept': '*/*',
|
|
'Origin': self.proxy_url,
|
|
}
|
|
|
|
# Format payload with boundary
|
|
payload = f'--{boundary}\r\n'
|
|
payload += 'Content-Type: application/dicom\r\n\r\n'
|
|
|
|
# Combine payload with binary data and closing boundary
|
|
data = payload.encode('utf-8') + dicom_content + f'\r\n--{boundary}--'.encode('utf-8')
|
|
|
|
response = requests.post(proxy_url, headers=headers, data=data)
|
|
response.raise_for_status()
|
|
logger.info(f"Successfully uploaded {filepath}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload {filepath}: {e}")
|
|
return False
|
|
|
|
def _update_status(self, order_id, status):
|
|
"""Update order status in database (placeholder)"""
|
|
|
|
logger.info(f"Updated order {order_id} status to {status}")
|
|
|
|
url = f"{config.API_URL}/update_dicom_upstatus"
|
|
|
|
payload = {
|
|
"GdcUpreqID": order_id,
|
|
"GdcUpreq_DicomUpStatus": status
|
|
}
|
|
|
|
headers = {
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, json=payload, headers=headers)
|
|
response.raise_for_status()
|
|
|
|
response_data = response.json()
|
|
|
|
if response_data.get("status") == "OK":
|
|
logger.info(f"Successfully updated order {order_id} status to {status}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to update order {order_id} status: {response_data.get('message')}")
|
|
return False
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error updating order {order_id} status: {e}")
|
|
return False
|
|
except ValueError as e:
|
|
logger.error(f"Invalid response format for order {order_id}: {e}")
|
|
return False
|
|
|
|
|
|
def process_pending_orders(self):
|
|
"""Process all pending orders"""
|
|
orders = self._get_pending_orders()
|
|
logger.info(f"Found {len(orders)} pending orders")
|
|
|
|
for order in orders:
|
|
try:
|
|
order_id = order.get('GdcUpreqID')
|
|
study_uid = order.get('GdcUpreq_StudyIUID')
|
|
series_uid = order.get('GdcUpreq_SeriesIUID')
|
|
sop_uid = order.get('GdcUpreq_SopIUID')
|
|
|
|
logger.info(f"Processing order {order_id} for study {study_uid}")
|
|
|
|
# self._update_status(order_id, 1) # ? PROGRES: apakah perlu?
|
|
|
|
# Retrieve DICOM files
|
|
dicom_files = self._retrieve_dicom(study_uid, series_uid, sop_uid)
|
|
logger.info(f"Retrieved {len(dicom_files)} DICOM files")
|
|
|
|
success_count = 0
|
|
failure_count = 0
|
|
|
|
# Upload each file
|
|
for filepath in dicom_files:
|
|
if self._upload_dicom_to_cloud(filepath):
|
|
success_count += 1
|
|
else:
|
|
failure_count += 1
|
|
|
|
# Clean up file after upload
|
|
os.remove(filepath)
|
|
|
|
# Update status
|
|
if failure_count == 0 and success_count > 0:
|
|
self._update_status(order_id, 2)
|
|
# elif success_count > 0:
|
|
# self._update_status(order_id, "PARTIALLY_COMPLETED")
|
|
else:
|
|
self._update_status(order_id, 3)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing order: {e}")
|
|
self._update_status(order.get('GdcUpreqID'), "FAILED")
|
|
|
|
def cleanup(self):
|
|
"""Clean up temporary files"""
|
|
try:
|
|
os.rmdir(self.temp_dir)
|
|
logger.info(f"Removed temporary directory: {self.temp_dir}")
|
|
except OSError:
|
|
logger.warning(f"Could not remove temporary directory: {self.temp_dir}. It may not be empty.")
|
|
|
|
def main():
|
|
uploader = DicomUploader(
|
|
pacs_host=config.PACS_HOST,
|
|
pacs_port=config.PACS_PORT,
|
|
pacs_ae_title=config.PACS_AE_TITLE,
|
|
local_ae_title=config.LOCAL_AE_TITLE,
|
|
proxy_url=config.PROXY_URL
|
|
)
|
|
|
|
try:
|
|
uploader.process_pending_orders()
|
|
finally:
|
|
uploader.cleanup()
|
|
|
|
logger.info("Finished processing orders")
|
|
exit(0)
|
|
|
|
if __name__ == "__main__":
|
|
main() |