package service import ( "context" "fmt" "log/slog" "path/filepath" "sync" "time" "mkiso-server/internal/config" "mkiso-server/pkg/dicom" ) // DicomService handles DICOM retrieval from PACS via storescp + movescu. type DicomService struct { cfg *config.Config portAllocMgr *portManager } // NewDicomService creates a new DicomService. func NewDicomService(cfg *config.Config) *DicomService { return &DicomService{ cfg: cfg, portAllocMgr: newPortManager(), } } // FetchDICOM retrieves all DICOM files for a single accession number. // It starts storescp, runs movescu, then stops storescp. // Returns the number of files retrieved, or an error. func (s *DicomService) FetchDICOM(ctx context.Context, accessionNumber, destDir string) (filesCount int, err error) { port, release := s.portAllocMgr.allocate(s.cfg.OurAE.BasePort, s.cfg.OurAE.PortRange) if port == 0 { return 0, fmt.Errorf("no available port for storescp") } defer release() return s.fetchDICOMWithPort(ctx, accessionNumber, destDir, port) } // FetchDICOMMultiple retrieves DICOM files for multiple accession numbers. // It starts storescp ONCE, runs movescu for each accession, then stops storescp. // Returns the total number of files retrieved, or an error. func (s *DicomService) FetchDICOMMultiple(ctx context.Context, accessionNumbers []string, destDir string) (totalFiles int, err error) { port, release := s.portAllocMgr.allocate(s.cfg.OurAE.BasePort, s.cfg.OurAE.PortRange) if port == 0 { return 0, fmt.Errorf("no available port for storescp") } defer release() // Start storescp once storescpCtx, cancelStorescp := context.WithCancel(ctx) defer cancelStorescp() resultCh, stop, err := dicom.StartStoresCP( storescpCtx, s.cfg.DCMTK.Storescp, s.cfg.OurAE.AETitle, port, destDir, ) if err != nil { return 0, fmt.Errorf("start storescp: %w", err) } defer stop() // Wait for storescp to be ready if err := waitForStorescpReady(resultCh); err != nil { return 0, err } // Run movescu for each accession for _, acc := range accessionNumbers { select { case <-ctx.Done(): stop() return totalFiles, ctx.Err() default: } studyUIDs, err := s.findStudyUIDs(ctx, acc) if err != nil { slog.Warn("findscu failed for accession", "accession", acc, "error", err, ) continue } for _, studyUID := range studyUIDs { exitCode, _, stderr, err := dicom.RunMoveSCUByStudyUID( ctx, s.cfg.DCMTK.Movescu, s.cfg.OurAE.AETitle, s.cfg.PACS.AETitle, s.cfg.PACS.Host, s.cfg.PACS.Port, port, studyUID, 300, // 5 min timeout ) if err != nil { slog.Warn("movescu failed for study uid", "accession", acc, "study_uid", studyUID, "exit_code", exitCode, "stderr", stderr, "error", err, ) continue } slog.Info("movescu completed", "accession", acc, "study_uid", studyUID, "exit_code", exitCode, ) } } // Count files retrieved filesCount, err := countFiles(destDir) if err != nil { slog.Warn("count files failed", "dir", destDir, "error", err) } if filesCount == 0 { return 0, fmt.Errorf("no DICOM data retrieved") } return filesCount, nil } // fetchDICOMWithPort uses a specific port for storescp + movescu. func (s *DicomService) fetchDICOMWithPort(ctx context.Context, accessionNumber, destDir string, port int) (filesCount int, err error) { storescpCtx, cancelStorescp := context.WithCancel(ctx) defer cancelStorescp() resultCh, stop, err := dicom.StartStoresCP( storescpCtx, s.cfg.DCMTK.Storescp, s.cfg.OurAE.AETitle, port, destDir, ) if err != nil { return 0, fmt.Errorf("start storescp: %w", err) } defer stop() // Wait for storescp to be ready if err := waitForStorescpReady(resultCh); err != nil { return 0, err } studyUIDs, err := s.findStudyUIDs(ctx, accessionNumber) if err != nil { return 0, err } for _, studyUID := range studyUIDs { exitCode, _, stderr, err := dicom.RunMoveSCUByStudyUID( ctx, s.cfg.DCMTK.Movescu, s.cfg.OurAE.AETitle, s.cfg.PACS.AETitle, s.cfg.PACS.Host, s.cfg.PACS.Port, port, studyUID, 300, // 5 min timeout ) if err != nil { return 0, fmt.Errorf("movescu failed for study uid %q (exit %d): %s (stderr: %s)", studyUID, exitCode, err, stderr) } slog.Info("movescu completed", "accession", accessionNumber, "study_uid", studyUID, "exit_code", exitCode, ) } // Count files filesCount, err = countFiles(destDir) if err != nil { slog.Warn("count files failed", "dir", destDir, "error", err) } if filesCount == 0 { return 0, fmt.Errorf("no DICOM data for accession %q", accessionNumber) } return filesCount, nil } func (s *DicomService) findStudyUIDs(ctx context.Context, accessionNumber string) ([]string, error) { findscuBin := filepath.Join(filepath.Dir(s.cfg.DCMTK.Movescu), "findscu") studyUIDs, _, stderr, err := dicom.RunFindSCUStudyUIDs( ctx, findscuBin, s.cfg.OurAE.AETitle, s.cfg.PACS.AETitle, s.cfg.PACS.Host, s.cfg.PACS.Port, accessionNumber, 60, ) if err != nil { return nil, fmt.Errorf("findscu failed for accession %q: %w (stderr: %s)", accessionNumber, err, stderr) } return studyUIDs, nil } // waitForStorescpReady waits for storescp to start or detects early failure. func waitForStorescpReady(resultCh <-chan dicom.StorescpResult) error { select { case result := <-resultCh: return fmt.Errorf("storescp exited prematurely (pid %d, err: %v, stderr: %s)", result.PID, result.Err, result.Stderr) case <-time.After(2 * time.Second): return nil } } // countFiles counts regular files in a directory (non-recursive). func countFiles(dir string) (int, error) { return dicom.CountFiles(dir) } // portManager manages a pool of allocated ports with mutex safety. type portManager struct { mu sync.Mutex portsInUse map[int]bool } func newPortManager() *portManager { return &portManager{ portsInUse: make(map[int]bool), } } // allocate picks a free port from [base, base+size). Returns 0 if none available. func (pm *portManager) allocate(base, size int) (int, func()) { pm.mu.Lock() defer pm.mu.Unlock() for i := 0; i < size; i++ { port := base + i if !pm.portsInUse[port] { pm.portsInUse[port] = true return port, func() { pm.mu.Lock() delete(pm.portsInUse, port) pm.mu.Unlock() } } } return 0, nil }