init: bisa cget 1dcm dan upload
This commit is contained in:
23
config.py
Normal file
23
config.py
Normal file
@@ -0,0 +1,23 @@
|
||||
# config.py
|
||||
# Configuration settings for the DICOM uploader
|
||||
|
||||
# PACS Configuration
|
||||
PACS_HOST = '128.199.154.150' # Replace with your PACS host
|
||||
PACS_PORT = 11112 # Replace with your PACS port
|
||||
PACS_AE_TITLE = 'ABPACS' # Replace with your PACS AE Title
|
||||
LOCAL_AE_TITLE = 'DCM UPLOADER' # Replace with your local AE Title
|
||||
|
||||
# Go OHIF Proxy Configuration
|
||||
PROXY_URL = 'http://128.199.154.150:5555'
|
||||
|
||||
# API Configuration
|
||||
API_URL = 'https://devone.aplikasi.web.id/one-api/mockup/godicomupreq/godicomupreq/get_uprequests'
|
||||
|
||||
# Processing Configuration
|
||||
MAX_RETRIES = 3
|
||||
RETRY_DELAY = 5 # seconds
|
||||
BATCH_SIZE = 10 # Process orders in batches
|
||||
|
||||
# Logging Configuration
|
||||
LOG_LEVEL = 'INFO'
|
||||
LOG_FILE = 'server.log'
|
||||
290
main.py
Normal file
290
main.py
Normal file
@@ -0,0 +1,290 @@
|
||||
import config # config.py
|
||||
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
import requests
|
||||
import tempfile
|
||||
from pynetdicom import AE, StoragePresentationContexts, evt, build_role
|
||||
from pydicom.dataset import Dataset
|
||||
from pydicom import dcmread
|
||||
import uuid
|
||||
from datetime import datetime, timedelta
|
||||
from pynetdicom.sop_class import PatientRootQueryRetrieveInformationModelGet
|
||||
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S',
|
||||
handlers=[logging.StreamHandler()]
|
||||
)
|
||||
logger = logging.getLogger('DicomUploader')
|
||||
|
||||
class DicomUploader:
|
||||
def __init__(self, pacs_host, pacs_port, pacs_ae_title, local_ae_title, proxy_url):
|
||||
"""
|
||||
Initialize the DICOM uploader
|
||||
|
||||
Args:
|
||||
pacs_host: Hostname/IP of the PACS server
|
||||
pacs_port: Port number of the PACS server
|
||||
pacs_ae_title: AE Title of the PACS server
|
||||
local_ae_title: Local AE Title to use for connection
|
||||
proxy_url: URL of the Go OHIF Proxy
|
||||
"""
|
||||
self.pacs_host = pacs_host
|
||||
self.pacs_port = pacs_port
|
||||
self.pacs_ae_title = pacs_ae_title
|
||||
self.local_ae_title = local_ae_title
|
||||
self.proxy_url = proxy_url
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
logger.info(f"Created temporary directory: {self.temp_dir}")
|
||||
|
||||
def _get_pending_orders(self):
|
||||
"""
|
||||
Fetch pending orders from the API
|
||||
|
||||
Uses startDate, endDate, and status parameters to filter results
|
||||
Status 0 indicates pending orders
|
||||
"""
|
||||
# Using the corrected endpoint
|
||||
url = "https://devone.aplikasi.web.id/one-api/mockup/godicomupreq/godicomupreq/get_uprequests"
|
||||
|
||||
# Add required query parameters
|
||||
today = datetime.now()
|
||||
yesterday = today - timedelta(days=1)
|
||||
|
||||
params = {
|
||||
'startDate': yesterday.strftime('%Y-%m-%d'),
|
||||
'endDate': today.strftime('%Y-%m-%d'),
|
||||
'status': '0' # Status 0 for pending orders
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(url, params=params)
|
||||
response.raise_for_status()
|
||||
response_data = response.json()
|
||||
|
||||
# Parse the nested JSON structure
|
||||
if response_data.get("status") == "OK" and "data" in response_data:
|
||||
return response_data["data"]
|
||||
else:
|
||||
logger.warning("API returned a response without data or with non-OK status")
|
||||
return []
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Failed to fetch pending orders: {e}")
|
||||
return []
|
||||
|
||||
def _retrieve_dicom(self, study_instance_uid, series_instance_uid=None, sop_instance_uid=None):
|
||||
"""
|
||||
Retrieve DICOM files from PACS using C-GET
|
||||
|
||||
Returns:
|
||||
List of paths to retrieved DICOM files
|
||||
"""
|
||||
ae = AE(ae_title=self.local_ae_title)
|
||||
ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
|
||||
|
||||
storage_uids = [
|
||||
'1.2.840.10008.5.1.4.1.1.1', # CR Storage
|
||||
'1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage
|
||||
'1.2.840.10008.5.1.4.1.1.2', # CT Image Storage
|
||||
'1.2.840.10008.5.1.4.1.1.4', # MR Image Storage
|
||||
'1.2.840.10008.5.1.4.1.1.7', # Secondary Capture Image Storage
|
||||
'1.2.840.10008.5.1.4.1.1.6.1', # Ultrasound Image Storage
|
||||
'1.2.840.10008.5.1.4.1.1.128', # PET Image Storage
|
||||
'1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine Image Storage
|
||||
'1.2.840.10008.5.1.4.1.1.9.1.1', # 12-lead ECG Waveform Storage
|
||||
'1.2.840.10008.5.1.4.1.1.9.1.2', # General ECG Waveform Storage
|
||||
]
|
||||
|
||||
# * == Kalau error terkait context ketika C-STORE RSP cek disini
|
||||
ext_neg = []
|
||||
for uid in storage_uids:
|
||||
ae.add_requested_context(uid)
|
||||
role = build_role(uid, scp_role=True)
|
||||
ext_neg.append(role)
|
||||
|
||||
retrieved_files = []
|
||||
|
||||
def handle_store(event):
|
||||
"""Handle a C-STORE request from the peer"""
|
||||
dataset = event.dataset
|
||||
|
||||
# Save the dataset to a temporary file
|
||||
filename = f"{dataset.SOPInstanceUID}.dcm"
|
||||
filepath = os.path.join(self.temp_dir, filename)
|
||||
dataset.save_as(filepath)
|
||||
|
||||
retrieved_files.append(filepath)
|
||||
|
||||
return 0x0000 # Success
|
||||
|
||||
handlers = [(evt.EVT_C_STORE, handle_store)]
|
||||
|
||||
try:
|
||||
# Create association with the peer
|
||||
assoc = ae.associate(
|
||||
self.pacs_host,
|
||||
self.pacs_port,
|
||||
ae_title=self.pacs_ae_title,
|
||||
ext_neg=ext_neg, # No extended negotiation
|
||||
evt_handlers=handlers
|
||||
)
|
||||
|
||||
if assoc.is_established:
|
||||
ds = Dataset()
|
||||
|
||||
# For IMAGE level retrieval
|
||||
if sop_instance_uid:
|
||||
ds.QueryRetrieveLevel = 'IMAGE'
|
||||
ds.StudyInstanceUID = study_instance_uid
|
||||
ds.SeriesInstanceUID = series_instance_uid
|
||||
ds.SOPInstanceUID = sop_instance_uid
|
||||
elif series_instance_uid:
|
||||
ds.QueryRetrieveLevel = 'SERIES'
|
||||
ds.StudyInstanceUID = study_instance_uid
|
||||
ds.SeriesInstanceUID = series_instance_uid
|
||||
else:
|
||||
ds.QueryRetrieveLevel = 'STUDY'
|
||||
ds.StudyInstanceUID = study_instance_uid
|
||||
|
||||
# Send C-GET request and collect responses
|
||||
responses = assoc.send_c_get(ds, PatientRootQueryRetrieveInformationModelGet)
|
||||
|
||||
# for (status, identifier) in responses:
|
||||
# if status:
|
||||
# print('C-GET query status: 0x{0:04x}'.format(status.Status))
|
||||
# else:
|
||||
# print('Connection timed out, was aborted or received invalid response')
|
||||
|
||||
for status, identifier in responses:
|
||||
if status:
|
||||
status_int = getattr(status, 'Status', 0)
|
||||
logger.info(f"C-GET status: 0x{status_int:04x}")
|
||||
|
||||
# Check for specific status codes
|
||||
if status_int == 0xa702: # Failed SOP Instance
|
||||
if identifier and hasattr(identifier, 'FailedSOPInstanceUIDList'):
|
||||
logger.error(f"Failed to retrieve: {identifier.FailedSOPInstanceUIDList}")
|
||||
|
||||
# Release the association
|
||||
assoc.release()
|
||||
else:
|
||||
logger.error("Association with PACS failed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during DICOM retrieval: {e}")
|
||||
|
||||
return retrieved_files
|
||||
|
||||
def _upload_dicom_to_cloud(self, filepath):
|
||||
"""Upload a DICOM file to Google Healthcare API via Go OHIF Proxy"""
|
||||
proxy_url = f"{self.proxy_url}/dicomWeb/studies"
|
||||
boundary = f"bd{uuid.uuid4()}"
|
||||
|
||||
try:
|
||||
with open(filepath, 'rb') as dicom_file:
|
||||
dicom_content = dicom_file.read()
|
||||
|
||||
headers = {
|
||||
'Content-Type': f'multipart/related; type="application/dicom"; boundary="{boundary}"',
|
||||
'Accept': '*/*',
|
||||
'Origin': self.proxy_url,
|
||||
}
|
||||
|
||||
# Format payload with boundary
|
||||
payload = f'--{boundary}\r\n'
|
||||
payload += 'Content-Type: application/dicom\r\n\r\n'
|
||||
|
||||
# Combine payload with binary data and closing boundary
|
||||
data = payload.encode('utf-8') + dicom_content + f'\r\n--{boundary}--'.encode('utf-8')
|
||||
|
||||
response = requests.post(proxy_url, headers=headers, data=data)
|
||||
response.raise_for_status()
|
||||
logger.info(f"Successfully uploaded {filepath}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to upload {filepath}: {e}")
|
||||
return False
|
||||
|
||||
def _update_status(self, order_id, status):
|
||||
"""Update order status in database (placeholder)"""
|
||||
logger.info(f"Updated order {order_id} status to {status}")
|
||||
# Implement actual status update logic here
|
||||
|
||||
def process_pending_orders(self):
|
||||
"""Process all pending orders"""
|
||||
orders = self._get_pending_orders()
|
||||
logger.info(f"Found {len(orders)} pending orders")
|
||||
|
||||
for order in orders:
|
||||
try:
|
||||
order_id = order.get('GdcUpreqID')
|
||||
study_uid = order.get('GdcUpreq_StudyIUID')
|
||||
series_uid = order.get('GdcUpreq_SeriesIUID')
|
||||
sop_uid = order.get('GdcUpreq_SopIUID')
|
||||
|
||||
logger.info(f"Processing order {order_id} for study {study_uid}")
|
||||
|
||||
# Retrieve DICOM files
|
||||
dicom_files = self._retrieve_dicom(study_uid, series_uid, sop_uid)
|
||||
logger.info(f"Retrieved {len(dicom_files)} DICOM files")
|
||||
|
||||
success_count = 0
|
||||
failure_count = 0
|
||||
|
||||
# Upload each file
|
||||
for filepath in dicom_files:
|
||||
if self._upload_dicom_to_cloud(filepath):
|
||||
success_count += 1
|
||||
else:
|
||||
failure_count += 1
|
||||
|
||||
# Clean up file after upload
|
||||
os.remove(filepath)
|
||||
|
||||
# Update status
|
||||
if failure_count == 0 and success_count > 0:
|
||||
self._update_status(order_id, "COMPLETED")
|
||||
elif success_count > 0:
|
||||
self._update_status(order_id, "PARTIALLY_COMPLETED")
|
||||
else:
|
||||
self._update_status(order_id, "FAILED")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing order: {e}")
|
||||
self._update_status(order.get('GdcUpreqID'), "FAILED")
|
||||
|
||||
def cleanup(self):
|
||||
"""Clean up temporary files"""
|
||||
try:
|
||||
os.rmdir(self.temp_dir)
|
||||
logger.info(f"Removed temporary directory: {self.temp_dir}")
|
||||
except OSError:
|
||||
logger.warning(f"Could not remove temporary directory: {self.temp_dir}. It may not be empty.")
|
||||
|
||||
def main():
|
||||
uploader = DicomUploader(
|
||||
pacs_host=config.PACS_HOST,
|
||||
pacs_port=config.PACS_PORT,
|
||||
pacs_ae_title=config.PACS_AE_TITLE,
|
||||
local_ae_title=config.LOCAL_AE_TITLE,
|
||||
proxy_url=config.PROXY_URL
|
||||
)
|
||||
|
||||
try:
|
||||
uploader.process_pending_orders()
|
||||
finally:
|
||||
uploader.cleanup()
|
||||
|
||||
logger.info("Finished processing orders")
|
||||
# exit
|
||||
exit(0)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
3
requirements.txt
Normal file
3
requirements.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
pynetdicom>=2.0.0
|
||||
pydicom>=2.3.0
|
||||
requests>=2.28.0
|
||||
Reference in New Issue
Block a user