fix header process by date, add store log when killed, modularize workflow function

This commit is contained in:
mario
2025-05-31 16:46:05 +07:00
parent 06cd92013b
commit ebea3d531a
3 changed files with 259 additions and 168 deletions

391
main.py
View File

@@ -15,6 +15,7 @@ from utils.cleanup import register_exit_handlers, register_cleanup_dir
from services.dicom_finder import DicomFinder
from services.dicom_retriever import DicomRetriever
from services.dicom_sender import DicomSender
import signal
def parse_args():
"""Parse command line arguments."""
@@ -362,6 +363,142 @@ def send_file(args):
logger.error(f"Error sending file: {str(e)}")
sys.exit(1)
def process_single_study(study_uid, accession_number=None, log_dir=None):
"""
Process a single study: retrieve it, send to destination PACS,
and create detailed JSON logs.
Args:
study_uid: Study Instance UID
accession_number: Optional accession number
log_dir: Directory to save logs
Returns:
dict: Result information
"""
log_dir = log_dir or settings.JSON_OUTPUT_DIR
# Create services
finder = DicomFinder()
retriever = DicomRetriever()
sender = DicomSender()
# First, get the study metadata with a query if not provided
if not accession_number:
study_metadata = finder.find_study_by_uid(study_uid)
if not study_metadata:
logger.error(f"Study not found: {study_uid}")
return {'success': False, 'error': 'Study not found'}
accession_number = getattr(study_metadata, 'AccessionNumber', '')
else:
study_metadata = None
logger.info(f"Processing study: {accession_number} with Study_IUID ({study_uid})")
# STEP 1: Retrieve the study
retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number)
if not retrieve_result['success']:
logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}")
return {'success': False, 'error': f"Failed to retrieve study: {retrieve_result['status']}"}
logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}")
# STEP 2: Send the study to destination PACS
send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid))
if not send_result['success']:
logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}")
logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS")
# STEP 3: Find all series and create detailed logs
series_list = finder.find_series_for_study(study_uid, accession_number=accession_number)
# Get metadata from study_metadata if available, otherwise from the first element of study list
if study_metadata:
study_date = getattr(study_metadata, 'StudyDate', '')
study_time = getattr(study_metadata, 'StudyTime', '000000')
patient_id = getattr(study_metadata, 'PatientID', '')
study_description = getattr(study_metadata, 'StudyDescription', '')
else:
# We need to query for it again
study_details = finder.find_study_by_uid(study_uid)
study_date = getattr(study_details, 'StudyDate', '') if study_details else ''
study_time = getattr(study_details, 'StudyTime', '000000') if study_details else '000000'
patient_id = getattr(study_details, 'PatientID', '') if study_details else ''
study_description = getattr(study_details, 'StudyDescription', '') if study_details else ''
study_datetime = f"{study_date}{study_time}"
study_log = {
'Study_IUID': study_uid,
'AccessionNumber': accession_number,
'PatientID': patient_id,
'StudyDescription': study_description,
'StudyDateTime': study_datetime,
'CstoreSuccess': send_result['success'],
'Series': []
}
for series in series_list:
series_uid = series.SeriesInstanceUID
# Get first instance for the series
instance = finder.find_first_instance_for_series(study_uid, series_uid)
series_info = {
'Series_IUID': getattr(series, 'SeriesInstanceUID', ''),
'SeriesNumber': getattr(series, 'SeriesNumber', ''),
'SeriesDescription': getattr(series, 'SeriesDescription', ''),
'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''),
'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else ''
}
study_log['Series'].append(series_info)
# STEP 4: Send study_log to HIS API
his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}"
try:
headers = {
'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==',
'Content-Type': 'application/json'
}
response = requests.post(his_url, json=study_log, headers=headers)
# Add response data to the log
study_log['HisApiResponse'] = {
'StatusCode': response.status_code,
'ResponseText': response.text,
'Success': False,
'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# Safer JSON parsing
try:
response_data = response.json() if response.content else {}
study_log['HisApiResponse']['ResponseData'] = response_data
if response.status_code == 200 and response_data.get('OK') == "1":
study_log['HisApiResponse']['Success'] = True
logger.info(f"Successfully sent JSON {accession_number} to HIS API")
return {'success': True, 'study_log': study_log, 'his_success': True}
else:
error_msg = response_data.get('message', 'Unknown error')
logger.error(f"Failed to send JSON for {accession_number} to HIS API: {error_msg}. Study_IUID: {study_uid}")
return {'success': True, 'study_log': study_log, 'his_success': False}
except ValueError:
study_log['HisApiResponse']['ParseError'] = "Invalid JSON response"
logger.error(f"Failed to parse response for {accession_number}, invalid JSON response. Status code: {response.status_code}")
return {'success': True, 'study_log': study_log, 'his_success': False}
except Exception as e:
study_log['HisApiResponse'] = {
'Success': False,
'Error': str(e),
'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}")
return {'success': True, 'study_log': study_log, 'his_success': False, 'error': str(e)}
def process_workflow(args):
"""
Process the full workflow: find studies by date range, retrieve them, and send to destination PACS.
@@ -387,11 +524,10 @@ def process_workflow(args):
# Create services
finder = DicomFinder()
retriever = DicomRetriever()
sender = DicomSender()
# Create directory for logs
create_directory_if_not_exists(log_dir)
create_directory_if_not_exists("logs")
# STEP 1: Find studies in the date range
try:
@@ -411,12 +547,30 @@ def process_workflow(args):
save_json_data(studies_summary, f"studies_{start_date}_to_{end_date}.json", log_dir)
# STEP 2, 3, 4: For each study, retrieve, send, and log details
his_log_filename = f"sendtohis_{start_date}_{end_date}.json"
his_log_path = os.path.join("logs", his_log_filename)
his_log = []
his_fail_log_filename = f"fail_sendtohis_{start_date}_{end_date}.json"
his_fail_log_path = os.path.join("logs", his_fail_log_filename)
his_fail_log = []
his_log_filename = f"sendtohis_{start_date}_{end_date}.json"
his_fail_log_filename = f"fail_sendtohis_{start_date}_{end_date}.json"
# Register signal handler for SIGINT
original_sigint_handler = signal.getsignal(signal.SIGINT)
def sigint_handler(sig, frame):
logger.warning("Process interrupted by user. Saving logs before exiting...")
# Save HIS logs
if his_log:
save_json_data(his_log, his_log_filename, "logs")
logger.info(f"Saved {len(his_log)} successful logs to {his_log_filename}")
if his_fail_log:
save_json_data(his_fail_log, his_fail_log_filename, "logs")
logger.info(f"Saved {len(his_fail_log)} failed logs to {his_fail_log_filename}")
# Restore original signal handler and raise KeyboardInterrupt
signal.signal(signal.SIGINT, original_sigint_handler)
sys.exit(1)
# Set up our custom signal handler
signal.signal(signal.SIGINT, sigint_handler)
for i, study in enumerate(studies):
accession_number = study.AccessionNumber
@@ -429,86 +583,35 @@ def process_workflow(args):
logger.info(f"Skipping study {study_uid} - log file already exists")
continue
# STEP 2: Retrieve the study
try:
retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number)
if not retrieve_result['success']:
logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}")
result = process_single_study(study_uid, accession_number, log_dir)
if not result['success']:
logger.error(f"Failed to process study {study_uid}: {result.get('error', 'Unknown error')}")
continue
logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}")
# Record results for HIS logs
if result.get('his_success', False):
his_log.append(result['study_log'])
else:
his_fail_log.append(result['study_log'])
# STEP 3: Send the study to destination PACS
send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid))
if not send_result['success']:
logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}")
logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS")
# STEP 4: Find all series and create detailed logs
series_list = finder.find_series_for_study(study_uid, accession_number=accession_number)
study_date = getattr(study, 'StudyDate', '')
study_time = getattr(study, 'StudyTime', '000000') # Default to midnight if StudyTime is not available
study_datetime = f"{study_date}{study_time}"
study_log = {
'Study_IUID': study_uid,
'AccessionNumber': getattr(study, 'AccessionNumber', ''),
'PatientID': getattr(study, 'PatientID', ''), # MedrecID
'StudyDescription': getattr(study, 'StudyDescription', ''),
'StudyDateTime': study_datetime,
'CstoreSuccess': send_result['success'],
'Series': []
}
for series in series_list:
series_uid = series.SeriesInstanceUID
# Save progress incrementally every 5 studies
if (i + 1) % 5 == 0:
if his_log:
save_json_data(his_log, his_log_filename, "logs")
if his_fail_log:
save_json_data(his_fail_log, his_fail_log_filename, "logs")
logger.info(f"Saved progress after processing {i+1} studies")
# Get first instance for the series
instance = finder.find_first_instance_for_series(study_uid, series_uid)
series_info = {
'Series_IUID': getattr(series, 'SeriesInstanceUID', ''),
'SeriesNumber': getattr(series, 'SeriesNumber', ''),
'SeriesDescription': getattr(series, 'SeriesDescription', ''),
'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''),
'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else ''
}
study_log['Series'].append(series_info)
# Save the detailed log
# save_json_data(study_log, f"{study_uid}.json", log_dir)
# logger.info(f"Created detailed log for study {study_uid}")
# STEP 5: Send study_log to HIS API
his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}"
try:
# Header tembak API HIS
headers = {
'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==',
'Content-Type': 'application/json'
}
response = requests.post(his_url, json=study_log, headers=headers)
# Gunakan ini untuk development
# response = requests.post(his_url, json=study_log)
if response.status_code == 200 and response.json().get('OK') == "1":
his_log.append(study_log)
logger.info(f"Successfully sent JSON {accession_number} to HIS API")
else:
his_fail_log.append(study_log)
logger.error(f"Failed to send JSON for {accession_number} to HIS API: {response.msg}. Study_IUID: {study_uid}")
except Exception as e:
logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}")
except Exception as e:
logger.error(f"Error processing study {accession_number}: {str(e)}")
continue
# Save HIS log
# Restore original signal handler
signal.signal(signal.SIGINT, original_sigint_handler)
# Final Save HIS logs
if his_log:
save_json_data(his_log, his_log_filename, "logs")
if his_fail_log:
@@ -522,8 +625,7 @@ def process_workflow(args):
def process_workflow_by_study(args):
"""
Process the full workflow for a specific study: retrieve it, send to destination PACS,
and create detailed JSON logs.
Process the full workflow for a specific study.
Args:
args: Command line arguments
@@ -533,103 +635,64 @@ def process_workflow_by_study(args):
logger.info(f"Starting full workflow process for study: {study_uid}")
# Create services
finder = DicomFinder()
retriever = DicomRetriever()
sender = DicomSender()
# Create directory for logs
create_directory_if_not_exists(log_dir)
create_directory_if_not_exists("logs")
# Process the study
try:
# First, get the study metadata with a query
study_metadata = finder.find_study_by_uid(study_uid)
if not study_metadata:
logger.error(f"Study not found: {study_uid}")
sys.exit(1)
accession_number = getattr(study_metadata, 'AccessionNumber', '')
logger.info(f"Processing study: {accession_number} with Study_IUID ({study_uid})")
# STEP 1: Retrieve the study
retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number)
if not retrieve_result['success']:
logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}")
sys.exit(1)
logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}")
# STEP 2: Send the study to destination PACS
send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid))
if not send_result['success']:
logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}")
logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS")
# STEP 3: Find all series and create detailed logs
series_list = finder.find_series_for_study(study_uid, accession_number=accession_number)
study_date = getattr(study_metadata, 'StudyDate', '')
study_time = getattr(study_metadata, 'StudyTime', '000000') # Default to midnight if StudyTime is not available
study_datetime = f"{study_date}{study_time}"
# Register signal handler for SIGINT
original_sigint_handler = signal.getsignal(signal.SIGINT)
# Create empty containers for logs
his_log = []
his_fail_log = []
his_log_filename = f"sendtohis_study_{study_uid}.json"
his_fail_log_filename = f"fail_sendtohis_study_{study_uid}.json"
study_log = {
'Study_IUID': study_uid,
'AccessionNumber': getattr(study_metadata, 'AccessionNumber', ''),
'PatientID': getattr(study_metadata, 'PatientID', ''), # MedrecID
'StudyDescription': getattr(study_metadata, 'StudyDescription', ''),
'StudyDateTime': study_datetime,
'CstoreSuccess': send_result['success'],
'Series': []
}
for series in series_list:
series_uid = series.SeriesInstanceUID
def sigint_handler(sig, frame):
logger.warning("Process interrupted by user. Saving logs before exiting...")
# Save individual study log
if 'result' in locals() and result.get('success', False):
save_json_data(result['study_log'], f"{study_uid}.json", log_dir)
# Get first instance for the series
instance = finder.find_first_instance_for_series(study_uid, series_uid)
series_info = {
'Series_IUID': getattr(series, 'SeriesInstanceUID', ''),
'SeriesNumber': getattr(series, 'SeriesNumber', ''),
'SeriesDescription': getattr(series, 'SeriesDescription', ''),
'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''),
'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else ''
}
study_log['Series'].append(series_info)
# STEP 4: Send study_log to HIS API
his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}"
try:
# Header tembak API HIS
headers = {
'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==',
'Content-Type': 'application/json'
}
response = requests.post(his_url, json=study_log, headers=headers)
# Pakai ini untuk Development
# response = requests.post(his_url, json=study_log)
if response.status_code == 200 and response.json().get('OK') == "1":
logger.info(f"Successfully sent JSON {accession_number} to HIS API")
# Save successful log
save_json_data([study_log], f"sendtohis_study_{study_uid}.json", "logs")
# Save HIS logs
if result.get('his_success', False):
save_json_data([result['study_log']], his_log_filename, "logs")
logger.info(f"Saved successful log to {his_log_filename}")
else:
logger.error(f"Failed to send JSON for {accession_number} to HIS API. Study_IUID: {study_uid}")
# Save failed log
save_json_data([study_log], f"fail_sendtohis_study_{study_uid}.json", "logs")
except Exception as e:
logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}")
save_json_data([result['study_log']], his_fail_log_filename, "logs")
logger.info(f"Saved failed log to {his_fail_log_filename}")
# Restore original signal handler and exit
signal.signal(signal.SIGINT, original_sigint_handler)
sys.exit(1)
# Set up our custom signal handler
signal.signal(signal.SIGINT, sigint_handler)
try:
result = process_single_study(study_uid, log_dir=log_dir)
if not result['success']:
logger.error(f"Failed to process study {study_uid}: {result.get('error', 'Unknown error')}")
sys.exit(1)
# Save individual study log
save_json_data(result['study_log'], f"{study_uid}.json", log_dir)
# Save logs
if result.get('his_success', False):
save_json_data([result['study_log']], his_log_filename, "logs")
else:
save_json_data([result['study_log']], his_fail_log_filename, "logs")
logger.info(f"Completed processing study: {study_uid}")
except Exception as e:
logger.error(f"Error during study processing: {str(e)}")
sys.exit(1)
finally:
# Restore original signal handler
signal.signal(signal.SIGINT, original_sigint_handler)
def main():
"""Main function."""

View File

@@ -39,6 +39,7 @@ class DicomRetriever:
# Add storage presentation contexts (needed for receiving the images)
# for context in StoragePresentationContexts:
# self.ae.add_requested_context(context.abstract_syntax)
storage_uids = [
'1.2.840.10008.5.1.4.1.1.1', # CR Storage
'1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage

View File

@@ -6,11 +6,15 @@ import glob
import pydicom
import shutil
from pydicom.dataset import Dataset
from pynetdicom import AE, StoragePresentationContexts, evt
from pynetdicom import AE, StoragePresentationContexts, evt, build_role
from config import settings
from utils.logger import main_logger as logger
from utils.error_handler import DicomStoreError, dicom_retry
from utils.dicom_utils import create_directory_if_not_exists
from pynetdicom.sop_class import (
StudyRootQueryRetrieveInformationModelGet,
PatientRootQueryRetrieveInformationModelGet
)
class DicomSender:
"""
@@ -27,9 +31,32 @@ class DicomSender:
self.pacs_config = pacs_config or settings.DESTINATION_PACS
self.ae = AE(ae_title=settings.SOURCE_AET)
# Add storage presentation contexts (all standard transfer syntaxes for each SOP class)
for context in StoragePresentationContexts:
self.ae.add_requested_context(context.abstract_syntax)
# Add the Query/Retrieve SOP classes
self.ae.add_requested_context(StudyRootQueryRetrieveInformationModelGet)
self.ae.add_requested_context(PatientRootQueryRetrieveInformationModelGet)
# Add storage presentation contexts (needed for receiving the images)
# for context in StoragePresentationContexts:
# self.ae.add_requested_context(context.abstract_syntax)
storage_uids = [
'1.2.840.10008.5.1.4.1.1.1', # CR Storage
'1.2.840.10008.5.1.4.1.1.1.1', # Digital X-Ray Image Storage
'1.2.840.10008.5.1.4.1.1.2', # CT Image Storage
'1.2.840.10008.5.1.4.1.1.4', # MR Image Storage
'1.2.840.10008.5.1.4.1.1.7', # Secondary Capture Image Storage
'1.2.840.10008.5.1.4.1.1.6.1', # Ultrasound Image Storage
'1.2.840.10008.5.1.4.1.1.128', # PET Image Storage
'1.2.840.10008.5.1.4.1.1.20', # Nuclear Medicine Image Storage
'1.2.840.10008.5.1.4.1.1.9.1.1', # 12-lead ECG Waveform Storage
'1.2.840.10008.5.1.4.1.1.9.1.2', # General ECG Waveform Storage
]
self.ext_neg = []
for uid in storage_uids:
self.ae.add_requested_context(uid)
role = build_role(uid, scp_role=True)
self.ext_neg.append(role)
logger.info(f"DicomSender initialized with destination PACS: {self.pacs_config['aet']}@{self.pacs_config['host']}:{self.pacs_config['port']}")