diff --git a/config.py b/config.py new file mode 100644 index 0000000..a6a44be --- /dev/null +++ b/config.py @@ -0,0 +1,23 @@ +# config.py +# Configuration settings for the DICOM uploader + +# PACS Configuration +PACS_HOST = '128.199.154.150' # Replace with your PACS host +PACS_PORT = 11112 # Replace with your PACS port +PACS_AE_TITLE = 'ABPACS' # Replace with your PACS AE Title +LOCAL_AE_TITLE = 'DCM UPLOADER' # Replace with your local AE Title + +# Go OHIF Proxy Configuration +PROXY_URL = 'http://128.199.154.150:5555' + +# API Configuration +API_URL = 'https://devone.aplikasi.web.id/one-api/mockup/godicomupreq/godicomupreq/get_uprequests' + +# Processing Configuration +MAX_RETRIES = 3 +RETRY_DELAY = 5 # seconds +BATCH_SIZE = 10 # Process orders in batches + +# Logging Configuration +LOG_LEVEL = 'INFO' +LOG_FILE = 'server.log' \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..5ca58ca --- /dev/null +++ b/main.py @@ -0,0 +1,290 @@ +import config # config.py + +import os +import time +import logging +import json +import requests +import tempfile +from pynetdicom import AE, StoragePresentationContexts, evt, build_role +from pydicom.dataset import Dataset +from pydicom import dcmread +import uuid +from datetime import datetime, timedelta +from pynetdicom.sop_class import PatientRootQueryRetrieveInformationModelGet + + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + handlers=[logging.StreamHandler()] +) +logger = logging.getLogger('DicomUploader') + +class DicomUploader: + def __init__(self, pacs_host, pacs_port, pacs_ae_title, local_ae_title, proxy_url): + """ + Initialize the DICOM uploader + + Args: + pacs_host: Hostname/IP of the PACS server + pacs_port: Port number of the PACS server + pacs_ae_title: AE Title of the PACS server + local_ae_title: Local AE Title to use for connection + proxy_url: URL of the Go OHIF Proxy + """ + self.pacs_host = pacs_host + self.pacs_port = pacs_port + self.pacs_ae_title = pacs_ae_title + self.local_ae_title = local_ae_title + self.proxy_url = proxy_url + self.temp_dir = tempfile.mkdtemp() + logger.info(f"Created temporary directory: {self.temp_dir}") + + def _get_pending_orders(self): + """ + Fetch pending orders from the API + + Uses startDate, endDate, and status parameters to filter results + Status 0 indicates pending orders + """ + # Using the corrected endpoint + url = "https://devone.aplikasi.web.id/one-api/mockup/godicomupreq/godicomupreq/get_uprequests" + + # Add required query parameters + today = datetime.now() + yesterday = today - timedelta(days=1) + + params = { + 'startDate': yesterday.strftime('%Y-%m-%d'), + 'endDate': today.strftime('%Y-%m-%d'), + 'status': '0' # Status 0 for pending orders + } + + try: + response = requests.get(url, params=params) + response.raise_for_status() + response_data = response.json() + + # Parse the nested JSON structure + if response_data.get("status") == "OK" and "data" in response_data: + return response_data["data"] + else: + logger.warning("API returned a response without data or with non-OK status") + return [] + except requests.exceptions.RequestException as e: + logger.error(f"Failed to fetch pending orders: {e}") + return [] + + def _retrieve_dicom(self, study_instance_uid, series_instance_uid=None, sop_instance_uid=None): + """ + Retrieve DICOM files from PACS using C-GET + + Returns: + List of paths to retrieved DICOM files + """ + ae = AE(ae_title=self.local_ae_title) + ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet) + + storage_uids = [ + '1.2.840.10008.5.1.4.1.1.1', # CR Storage + '1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage + '1.2.840.10008.5.1.4.1.1.2', # CT Image Storage + '1.2.840.10008.5.1.4.1.1.4', # MR Image Storage + '1.2.840.10008.5.1.4.1.1.7', # Secondary Capture Image Storage + '1.2.840.10008.5.1.4.1.1.6.1', # Ultrasound Image Storage + '1.2.840.10008.5.1.4.1.1.128', # PET Image Storage + '1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine Image Storage + '1.2.840.10008.5.1.4.1.1.9.1.1', # 12-lead ECG Waveform Storage + '1.2.840.10008.5.1.4.1.1.9.1.2', # General ECG Waveform Storage + ] + + # * == Kalau error terkait context ketika C-STORE RSP cek disini + ext_neg = [] + for uid in storage_uids: + ae.add_requested_context(uid) + role = build_role(uid, scp_role=True) + ext_neg.append(role) + + retrieved_files = [] + + def handle_store(event): + """Handle a C-STORE request from the peer""" + dataset = event.dataset + + # Save the dataset to a temporary file + filename = f"{dataset.SOPInstanceUID}.dcm" + filepath = os.path.join(self.temp_dir, filename) + dataset.save_as(filepath) + + retrieved_files.append(filepath) + + return 0x0000 # Success + + handlers = [(evt.EVT_C_STORE, handle_store)] + + try: + # Create association with the peer + assoc = ae.associate( + self.pacs_host, + self.pacs_port, + ae_title=self.pacs_ae_title, + ext_neg=ext_neg, # No extended negotiation + evt_handlers=handlers + ) + + if assoc.is_established: + ds = Dataset() + + # For IMAGE level retrieval + if sop_instance_uid: + ds.QueryRetrieveLevel = 'IMAGE' + ds.StudyInstanceUID = study_instance_uid + ds.SeriesInstanceUID = series_instance_uid + ds.SOPInstanceUID = sop_instance_uid + elif series_instance_uid: + ds.QueryRetrieveLevel = 'SERIES' + ds.StudyInstanceUID = study_instance_uid + ds.SeriesInstanceUID = series_instance_uid + else: + ds.QueryRetrieveLevel = 'STUDY' + ds.StudyInstanceUID = study_instance_uid + + # Send C-GET request and collect responses + responses = assoc.send_c_get(ds, PatientRootQueryRetrieveInformationModelGet) + + # for (status, identifier) in responses: + # if status: + # print('C-GET query status: 0x{0:04x}'.format(status.Status)) + # else: + # print('Connection timed out, was aborted or received invalid response') + + for status, identifier in responses: + if status: + status_int = getattr(status, 'Status', 0) + logger.info(f"C-GET status: 0x{status_int:04x}") + + # Check for specific status codes + if status_int == 0xa702: # Failed SOP Instance + if identifier and hasattr(identifier, 'FailedSOPInstanceUIDList'): + logger.error(f"Failed to retrieve: {identifier.FailedSOPInstanceUIDList}") + + # Release the association + assoc.release() + else: + logger.error("Association with PACS failed") + + except Exception as e: + logger.error(f"Error during DICOM retrieval: {e}") + + return retrieved_files + + def _upload_dicom_to_cloud(self, filepath): + """Upload a DICOM file to Google Healthcare API via Go OHIF Proxy""" + proxy_url = f"{self.proxy_url}/dicomWeb/studies" + boundary = f"bd{uuid.uuid4()}" + + try: + with open(filepath, 'rb') as dicom_file: + dicom_content = dicom_file.read() + + headers = { + 'Content-Type': f'multipart/related; type="application/dicom"; boundary="{boundary}"', + 'Accept': '*/*', + 'Origin': self.proxy_url, + } + + # Format payload with boundary + payload = f'--{boundary}\r\n' + payload += 'Content-Type: application/dicom\r\n\r\n' + + # Combine payload with binary data and closing boundary + data = payload.encode('utf-8') + dicom_content + f'\r\n--{boundary}--'.encode('utf-8') + + response = requests.post(proxy_url, headers=headers, data=data) + response.raise_for_status() + logger.info(f"Successfully uploaded {filepath}") + return True + + except Exception as e: + logger.error(f"Failed to upload {filepath}: {e}") + return False + + def _update_status(self, order_id, status): + """Update order status in database (placeholder)""" + logger.info(f"Updated order {order_id} status to {status}") + # Implement actual status update logic here + + def process_pending_orders(self): + """Process all pending orders""" + orders = self._get_pending_orders() + logger.info(f"Found {len(orders)} pending orders") + + for order in orders: + try: + order_id = order.get('GdcUpreqID') + study_uid = order.get('GdcUpreq_StudyIUID') + series_uid = order.get('GdcUpreq_SeriesIUID') + sop_uid = order.get('GdcUpreq_SopIUID') + + logger.info(f"Processing order {order_id} for study {study_uid}") + + # Retrieve DICOM files + dicom_files = self._retrieve_dicom(study_uid, series_uid, sop_uid) + logger.info(f"Retrieved {len(dicom_files)} DICOM files") + + success_count = 0 + failure_count = 0 + + # Upload each file + for filepath in dicom_files: + if self._upload_dicom_to_cloud(filepath): + success_count += 1 + else: + failure_count += 1 + + # Clean up file after upload + os.remove(filepath) + + # Update status + if failure_count == 0 and success_count > 0: + self._update_status(order_id, "COMPLETED") + elif success_count > 0: + self._update_status(order_id, "PARTIALLY_COMPLETED") + else: + self._update_status(order_id, "FAILED") + + except Exception as e: + logger.error(f"Error processing order: {e}") + self._update_status(order.get('GdcUpreqID'), "FAILED") + + def cleanup(self): + """Clean up temporary files""" + try: + os.rmdir(self.temp_dir) + logger.info(f"Removed temporary directory: {self.temp_dir}") + except OSError: + logger.warning(f"Could not remove temporary directory: {self.temp_dir}. It may not be empty.") + +def main(): + uploader = DicomUploader( + pacs_host=config.PACS_HOST, + pacs_port=config.PACS_PORT, + pacs_ae_title=config.PACS_AE_TITLE, + local_ae_title=config.LOCAL_AE_TITLE, + proxy_url=config.PROXY_URL + ) + + try: + uploader.process_pending_orders() + finally: + uploader.cleanup() + + logger.info("Finished processing orders") + # exit + exit(0) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..cc3fc6a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pynetdicom>=2.0.0 +pydicom>=2.3.0 +requests>=2.28.0 \ No newline at end of file diff --git a/test.http b/test.http new file mode 100644 index 0000000..0465c2d --- /dev/null +++ b/test.http @@ -0,0 +1,3 @@ +@host = http://128.199.154.150:5555 + +GET {{host}}/dicomWeb/studies \ No newline at end of file