Files
dicom-iso/internal/service/dicom.go
2026-06-05 08:11:44 +07:00

223 lines
5.3 KiB
Go

package service
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"mkiso-server/internal/config"
"mkiso-server/pkg/dicom"
)
// DicomService handles DICOM retrieval from PACS via storescp + movescu.
type DicomService struct {
cfg *config.Config
portAllocMgr *portManager
}
// NewDicomService creates a new DicomService.
func NewDicomService(cfg *config.Config) *DicomService {
return &DicomService{
cfg: cfg,
portAllocMgr: newPortManager(),
}
}
// FetchDICOM retrieves all DICOM files for a single accession number.
// It starts storescp, runs movescu, then stops storescp.
// Returns the number of files retrieved, or an error.
func (s *DicomService) FetchDICOM(ctx context.Context, accessionNumber, destDir string) (filesCount int, err error) {
port, release := s.portAllocMgr.allocate(s.cfg.OurAE.BasePort, s.cfg.OurAE.PortRange)
if port == 0 {
return 0, fmt.Errorf("no available port for storescp")
}
defer release()
return s.fetchDICOMWithPort(ctx, accessionNumber, destDir, port)
}
// FetchDICOMMultiple retrieves DICOM files for multiple accession numbers.
// It starts storescp ONCE, runs movescu for each accession, then stops storescp.
// Returns the total number of files retrieved, or an error.
func (s *DicomService) FetchDICOMMultiple(ctx context.Context, accessionNumbers []string, destDir string) (totalFiles int, err error) {
port, release := s.portAllocMgr.allocate(s.cfg.OurAE.BasePort, s.cfg.OurAE.PortRange)
if port == 0 {
return 0, fmt.Errorf("no available port for storescp")
}
defer release()
// Start storescp once
storescpCtx, cancelStorescp := context.WithCancel(ctx)
defer cancelStorescp()
resultCh, stop, err := dicom.StartStoresCP(
storescpCtx,
s.cfg.DCMTK.Storescp,
s.cfg.OurAE.AETitle,
port,
destDir,
)
if err != nil {
return 0, fmt.Errorf("start storescp: %w", err)
}
defer stop()
// Wait for storescp to be ready
if err := waitForStorescpReady(resultCh); err != nil {
return 0, err
}
// Run movescu for each accession
for _, acc := range accessionNumbers {
select {
case <-ctx.Done():
stop()
return totalFiles, ctx.Err()
default:
}
exitCode, _, stderr, err := dicom.RunMoveSCU(
ctx,
s.cfg.DCMTK.Movescu,
s.cfg.OurAE.AETitle,
s.cfg.PACS.AETitle,
s.cfg.PACS.Host,
s.cfg.PACS.Port,
port,
acc,
300, // 5 min timeout
)
if err != nil {
slog.Warn("movescu failed for accession",
"accession", acc,
"exit_code", exitCode,
"stderr", stderr,
"error", err,
)
// Continue with next accession — partial success is acceptable
continue
}
slog.Info("movescu completed",
"accession", acc,
"exit_code", exitCode,
)
}
// Count files retrieved
filesCount, err := countFiles(destDir)
if err != nil {
slog.Warn("count files failed", "dir", destDir, "error", err)
}
if filesCount == 0 {
return 0, fmt.Errorf("no DICOM data retrieved")
}
return filesCount, nil
}
// fetchDICOMWithPort uses a specific port for storescp + movescu.
func (s *DicomService) fetchDICOMWithPort(ctx context.Context, accessionNumber, destDir string, port int) (filesCount int, err error) {
storescpCtx, cancelStorescp := context.WithCancel(ctx)
defer cancelStorescp()
resultCh, stop, err := dicom.StartStoresCP(
storescpCtx,
s.cfg.DCMTK.Storescp,
s.cfg.OurAE.AETitle,
port,
destDir,
)
if err != nil {
return 0, fmt.Errorf("start storescp: %w", err)
}
defer stop()
// Wait for storescp to be ready
if err := waitForStorescpReady(resultCh); err != nil {
return 0, err
}
// Run movescu
exitCode, _, stderr, err := dicom.RunMoveSCU(
ctx,
s.cfg.DCMTK.Movescu,
s.cfg.OurAE.AETitle,
s.cfg.PACS.AETitle,
s.cfg.PACS.Host,
s.cfg.PACS.Port,
port,
accessionNumber,
300, // 5 min timeout
)
if err != nil {
return 0, fmt.Errorf("movescu failed (exit %d): %s (stderr: %s)", exitCode, err, stderr)
}
slog.Info("movescu completed",
"accession", accessionNumber,
"exit_code", exitCode,
)
// Count files
filesCount, err = countFiles(destDir)
if err != nil {
slog.Warn("count files failed", "dir", destDir, "error", err)
}
if filesCount == 0 {
return 0, fmt.Errorf("no DICOM data for accession %q", accessionNumber)
}
return filesCount, nil
}
// waitForStorescpReady waits for storescp to start or detects early failure.
func waitForStorescpReady(resultCh <-chan dicom.StorescpResult) error {
select {
case result := <-resultCh:
return fmt.Errorf("storescp exited prematurely (pid %d, err: %v, stderr: %s)",
result.PID, result.Err, result.Stderr)
case <-time.After(2 * time.Second):
return nil
}
}
// countFiles counts regular files in a directory (non-recursive).
func countFiles(dir string) (int, error) {
return dicom.CountFiles(dir)
}
// portManager manages a pool of allocated ports with mutex safety.
type portManager struct {
mu sync.Mutex
portsInUse map[int]bool
}
func newPortManager() *portManager {
return &portManager{
portsInUse: make(map[int]bool),
}
}
// allocate picks a free port from [base, base+size). Returns 0 if none available.
func (pm *portManager) allocate(base, size int) (int, func()) {
pm.mu.Lock()
defer pm.mu.Unlock()
for i := 0; i < size; i++ {
port := base + i
if !pm.portsInUse[port] {
pm.portsInUse[port] = true
return port, func() {
pm.mu.Lock()
delete(pm.portsInUse, port)
pm.mu.Unlock()
}
}
}
return 0, nil
}