Skip to content

Archiver reader lock refactor#3807

Open
jacomago wants to merge 9 commits into
ControlSystemStudio:masterfrom
jacomago:archiver-reader-lock-refactor
Open

Archiver reader lock refactor#3807
jacomago wants to merge 9 commits into
ControlSystemStudio:masterfrom
jacomago:archiver-reader-lock-refactor

Conversation

@jacomago
Copy link
Copy Markdown
Contributor

Refactors for ArchiveFetchJob, ApplianceValueIterator, ApplianceArchiveReader.

Refactors:

  • Remove IteratorListener (didn't seem necessary to me)
  • Add more logging
  • Make fetch data in ArchiveFetchJob asynchrounous

Fixes:

  • Make lock in ApplianceValueIterator per object rather than global

Feature:

  • Add a timeout archive_read_timeout_ms to databrowser_preferences, so down ArchiveSources don't block fetching for others.

Testing:

  • Add tests for the fix
  • Add more generic tests for the involved classes

Checklist

  • Testing:

    • [ x] The feature has automated tests
    • [ x] Tests were run
    • If not, explain how you tested your changes
  • Documentation:

    • [x ] The feature is documented
    • The documentation is up to date
    • Release notes:
      • Added an entry if the change is breaking or significant
      • Added an entry when adding a new feature

jacomago and others added 9 commits May 26, 2026 10:15
With synchronized hasNext(), a worker blocked inside mainIterator.hasNext()
holds the 'this' monitor indefinitely; close() cannot acquire it and hangs.
closeCompletesWhileHasNextIsBlocking reproduces this with @timeout(5).

Includes FakeDataRetrieval/FakeApplianceArchiveReader/BlockingGenMsgIterator
test infrastructure and Mockito dependency.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
hasNext() no longer holds the 'this' monitor while blocking on a streaming
network read. close() can now acquire the lock concurrently, set closed=true,
and close the underlying stream — which in turn unblocks the reader.

next() still guards its mainIterator.next() call with synchronized(this) and
rechecks closed, so the close-between-hasNext-and-next race remains safe.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tches

twoIteratorFetchesProceedConcurrently shows that two iterators for
independent PVs block each other inside fetchDataInternal() because
they compete for a single static monitor.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Changing the lock from static to a per-instance final field means each
iterator only serialises its own concurrent fetchDataInternal calls.
Fetches for different PVs can now proceed concurrently.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ApplianceArchiveReaderTest: verifies enum PVs route to non-numeric iterator,
numeric scalars use the optimized path, cancel() closes active iterators,
and the WeakHashMap releases references after GC.

ApplianceMeanValueIteratorTest: checks the mean_<interval>() operator URL,
and that determineDisplay rejects enum/waveform types.

ApplianceOptimizedValueIteratorTest: verifies the optimized_N() URL,
VStatistics output when useStatistics=true, and VNumber when false.

ApplianceNonNumericOptimizedValueIteratorTest: checks n=1 routes raw fetch,
n>1 uses nth_N() operator, and the 1.5x boundary gives n=2.

ApplianceStatisticsValueIteratorTest: verifies all five operator streams
(mean, std, min, max, count) are opened, and that close() closes all five.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
faultySourceTimesOutAndLoopContinues fails: the slow source blocks
WorkerThread.run() indefinitely because no timeout guards the fetch
loop; @timeout(5) fires after 5 s, proving the bug.

Scaffolding added so the tests compile:
- archive_read_timeout_ms preference key and 30 s default
- ArchiveFetchJob.openReader() hook so TestableFetchJob can inject
  fake readers without going through ArchiveReaders SPI
- Package-private test-only constructor that skips JobManager
- Mockito added to databrowser pom

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
WorkerThread.run() now submits each source fetch to Activator.thread_pool
and calls Future.get(archive_read_timeout_ms, MILLISECONDS). On timeout:
- Future.cancel(true) interrupts the carrier thread
- The active ArchiveReader.cancel() is called to close the connection
- archiveFetchFailed() is reported and the loop advances to the next source

fetchFromSource() extracts the reader-open + iterator-drain logic that
previously lived inline in run(), keeping the outer loop readable.

archive_read_timeout_ms=0 disables the timeout (Future.get() with no
deadline), preserving the previous behaviour when needed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Broadens test coverage beyond the three regression scenarios (static
lock, hasNext/close deadlock, per-source timeout) to cover the normal
code paths that a reader of these classes relies on.

- ApplianceValueIteratorExtractDataTest (new, 13 tests): extractData()
  across all supported payload types — scalar numerics, enum, string,
  waveform — plus alarm severity and display header extraction.

- ApplianceArchiveReaderTest (+6 tests): getOptimizedValues() routing
  (points≤count→raw, statistics, mean, old-appliance fallback on fetch
  failure), iterator map registration, and close()→cancel() delegation.

- ApplianceStatisticsValueIteratorTest (+2 tests): VStatistics assembly
  from five sub-iterators, and null-safe next() after close().

- ArchiveFetchJobTest (+7 tests): UnknownChannelException→channelNotFound
  routing (full and partial), two healthy sources completing together,
  IOException→archiveFetchFailed, cancel-before-start, and RAW vs
  OPTIMIZED request type dispatch.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- ApplianceValueIterator.close(): log IOException at WARNING instead of
  wrapping it in IllegalStateException; move closed=true into finally so
  it is always set even when the stream close throws
- ApplianceStatisticsValueIterator.closeStream(): add package-level logger
  and emit WARNING instead of silently swallowing the IOException
- ApplianceArchiveReader.getOptimizedValues(): log FINE for expected type
  fallbacks and WARNING when the optimized or count-based path fails
- ArchiveFetchJob.WorkerThread: log WARNING for ExecutionException and
  generic Exception fetch failures, not just TimeoutException

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@jacomago jacomago self-assigned this May 26, 2026
@jacomago
Copy link
Copy Markdown
Contributor Author

Not sure whether to split this up into multiple MRs:

  • Lock changes
  • Timeout addition
  • Asynchronous fetching

@sonarqubecloud
Copy link
Copy Markdown

@kasemir
Copy link
Copy Markdown
Collaborator

kasemir commented May 26, 2026

Make fetch data in ArchiveFetchJob asynchrounous

The archive fetch job is already a background thread, and concurrent_requests controls how many are started.
Why add one more layer of threading?

@kasemir
Copy link
Copy Markdown
Collaborator

kasemir commented May 26, 2026

Is the intent of the added level of threading only for the timeout?

Before:
One ArchiveFetchJob iterates over data sources, reads from one source, merges with data-fetched-so-far?

With this PR:
One ArchiveFetchJob iterates over data sources, starts fetch in background to read from one source, 'gets' the future to obtain the result, merges with data-fetched-so-far?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants