261 lines
6.3 KiB
Go
261 lines
6.3 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"mkiso-server/internal/config"
|
|
"mkiso-server/pkg/dicom"
|
|
)
|
|
|
|
// DicomService handles DICOM retrieval from PACS via storescp + movescu.
|
|
type DicomService struct {
|
|
cfg *config.Config
|
|
portAllocMgr *portManager
|
|
}
|
|
|
|
// NewDicomService creates a new DicomService.
|
|
func NewDicomService(cfg *config.Config) *DicomService {
|
|
return &DicomService{
|
|
cfg: cfg,
|
|
portAllocMgr: newPortManager(),
|
|
}
|
|
}
|
|
|
|
// FetchDICOM retrieves all DICOM files for a single accession number.
|
|
// It starts storescp, runs movescu, then stops storescp.
|
|
// Returns the number of files retrieved, or an error.
|
|
func (s *DicomService) FetchDICOM(ctx context.Context, accessionNumber, destDir string) (filesCount int, err error) {
|
|
port, release := s.portAllocMgr.allocate(s.cfg.OurAE.BasePort, s.cfg.OurAE.PortRange)
|
|
if port == 0 {
|
|
return 0, fmt.Errorf("no available port for storescp")
|
|
}
|
|
defer release()
|
|
|
|
return s.fetchDICOMWithPort(ctx, accessionNumber, destDir, port)
|
|
}
|
|
|
|
// FetchDICOMMultiple retrieves DICOM files for multiple accession numbers.
|
|
// It starts storescp ONCE, runs movescu for each accession, then stops storescp.
|
|
// Returns the total number of files retrieved, or an error.
|
|
func (s *DicomService) FetchDICOMMultiple(ctx context.Context, accessionNumbers []string, destDir string) (totalFiles int, err error) {
|
|
port, release := s.portAllocMgr.allocate(s.cfg.OurAE.BasePort, s.cfg.OurAE.PortRange)
|
|
if port == 0 {
|
|
return 0, fmt.Errorf("no available port for storescp")
|
|
}
|
|
defer release()
|
|
|
|
// Start storescp once
|
|
storescpCtx, cancelStorescp := context.WithCancel(ctx)
|
|
defer cancelStorescp()
|
|
|
|
resultCh, stop, err := dicom.StartStoresCP(
|
|
storescpCtx,
|
|
s.cfg.DCMTK.Storescp,
|
|
s.cfg.OurAE.AETitle,
|
|
port,
|
|
destDir,
|
|
)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("start storescp: %w", err)
|
|
}
|
|
defer stop()
|
|
|
|
// Wait for storescp to be ready
|
|
if err := waitForStorescpReady(resultCh); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Run movescu for each accession
|
|
for _, acc := range accessionNumbers {
|
|
select {
|
|
case <-ctx.Done():
|
|
stop()
|
|
return totalFiles, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
studyUIDs, err := s.findStudyUIDs(ctx, acc)
|
|
if err != nil {
|
|
slog.Warn("findscu failed for accession",
|
|
"accession", acc,
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
for _, studyUID := range studyUIDs {
|
|
exitCode, _, stderr, err := dicom.RunMoveSCUByStudyUID(
|
|
ctx,
|
|
s.cfg.DCMTK.Movescu,
|
|
s.cfg.OurAE.AETitle,
|
|
s.cfg.PACS.AETitle,
|
|
s.cfg.PACS.Host,
|
|
s.cfg.PACS.Port,
|
|
port,
|
|
studyUID,
|
|
300, // 5 min timeout
|
|
)
|
|
if err != nil {
|
|
slog.Warn("movescu failed for study uid",
|
|
"accession", acc,
|
|
"study_uid", studyUID,
|
|
"exit_code", exitCode,
|
|
"stderr", stderr,
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
slog.Info("movescu completed",
|
|
"accession", acc,
|
|
"study_uid", studyUID,
|
|
"exit_code", exitCode,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Count files retrieved
|
|
filesCount, err := countFiles(destDir)
|
|
if err != nil {
|
|
slog.Warn("count files failed", "dir", destDir, "error", err)
|
|
}
|
|
|
|
if filesCount == 0 {
|
|
return 0, fmt.Errorf("no DICOM data retrieved")
|
|
}
|
|
|
|
return filesCount, nil
|
|
}
|
|
|
|
// fetchDICOMWithPort uses a specific port for storescp + movescu.
|
|
func (s *DicomService) fetchDICOMWithPort(ctx context.Context, accessionNumber, destDir string, port int) (filesCount int, err error) {
|
|
storescpCtx, cancelStorescp := context.WithCancel(ctx)
|
|
defer cancelStorescp()
|
|
|
|
resultCh, stop, err := dicom.StartStoresCP(
|
|
storescpCtx,
|
|
s.cfg.DCMTK.Storescp,
|
|
s.cfg.OurAE.AETitle,
|
|
port,
|
|
destDir,
|
|
)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("start storescp: %w", err)
|
|
}
|
|
defer stop()
|
|
|
|
// Wait for storescp to be ready
|
|
if err := waitForStorescpReady(resultCh); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
studyUIDs, err := s.findStudyUIDs(ctx, accessionNumber)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
for _, studyUID := range studyUIDs {
|
|
exitCode, _, stderr, err := dicom.RunMoveSCUByStudyUID(
|
|
ctx,
|
|
s.cfg.DCMTK.Movescu,
|
|
s.cfg.OurAE.AETitle,
|
|
s.cfg.PACS.AETitle,
|
|
s.cfg.PACS.Host,
|
|
s.cfg.PACS.Port,
|
|
port,
|
|
studyUID,
|
|
300, // 5 min timeout
|
|
)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("movescu failed for study uid %q (exit %d): %s (stderr: %s)", studyUID, exitCode, err, stderr)
|
|
}
|
|
|
|
slog.Info("movescu completed",
|
|
"accession", accessionNumber,
|
|
"study_uid", studyUID,
|
|
"exit_code", exitCode,
|
|
)
|
|
}
|
|
|
|
// Count files
|
|
filesCount, err = countFiles(destDir)
|
|
if err != nil {
|
|
slog.Warn("count files failed", "dir", destDir, "error", err)
|
|
}
|
|
|
|
if filesCount == 0 {
|
|
return 0, fmt.Errorf("no DICOM data for accession %q", accessionNumber)
|
|
}
|
|
|
|
return filesCount, nil
|
|
}
|
|
|
|
func (s *DicomService) findStudyUIDs(ctx context.Context, accessionNumber string) ([]string, error) {
|
|
findscuBin := filepath.Join(filepath.Dir(s.cfg.DCMTK.Movescu), "findscu")
|
|
studyUIDs, _, stderr, err := dicom.RunFindSCUStudyUIDs(
|
|
ctx,
|
|
findscuBin,
|
|
s.cfg.OurAE.AETitle,
|
|
s.cfg.PACS.AETitle,
|
|
s.cfg.PACS.Host,
|
|
s.cfg.PACS.Port,
|
|
accessionNumber,
|
|
60,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("findscu failed for accession %q: %w (stderr: %s)", accessionNumber, err, stderr)
|
|
}
|
|
return studyUIDs, nil
|
|
}
|
|
|
|
// waitForStorescpReady waits for storescp to start or detects early failure.
|
|
func waitForStorescpReady(resultCh <-chan dicom.StorescpResult) error {
|
|
select {
|
|
case result := <-resultCh:
|
|
return fmt.Errorf("storescp exited prematurely (pid %d, err: %v, stderr: %s)",
|
|
result.PID, result.Err, result.Stderr)
|
|
case <-time.After(2 * time.Second):
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// countFiles counts regular files in a directory (non-recursive).
|
|
func countFiles(dir string) (int, error) {
|
|
return dicom.CountFiles(dir)
|
|
}
|
|
|
|
// portManager manages a pool of allocated ports with mutex safety.
|
|
type portManager struct {
|
|
mu sync.Mutex
|
|
portsInUse map[int]bool
|
|
}
|
|
|
|
func newPortManager() *portManager {
|
|
return &portManager{
|
|
portsInUse: make(map[int]bool),
|
|
}
|
|
}
|
|
|
|
// allocate picks a free port from [base, base+size). Returns 0 if none available.
|
|
func (pm *portManager) allocate(base, size int) (int, func()) {
|
|
pm.mu.Lock()
|
|
defer pm.mu.Unlock()
|
|
|
|
for i := 0; i < size; i++ {
|
|
port := base + i
|
|
if !pm.portsInUse[port] {
|
|
pm.portsInUse[port] = true
|
|
return port, func() {
|
|
pm.mu.Lock()
|
|
delete(pm.portsInUse, port)
|
|
pm.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
return 0, nil
|
|
}
|