From c1d14728e86be9ef2e5a480eff88d85bcfef69b0 Mon Sep 17 00:00:00 2001 From: cookesan <6601329+cookesan@users.noreply.github.com> Date: Thu, 21 May 2026 04:48:14 -0400 Subject: [PATCH 1/3] fs: handle early writeFile stream errors Attach a temporary error listener to readable stream inputs before opening the destination file. This lets writeFile() reject with the stream error instead of allowing an early source error to become an uncaught exception. Remove the listener when the write finishes or when opening the destination fails. Signed-off-by: cookesan <6601329+cookesan@users.noreply.github.com> --- lib/internal/fs/promises.js | 84 +++++++++++++++---- .../test-fs-promises-file-handle-writeFile.js | 30 +++++++ test/parallel/test-fs-promises-writefile.js | 57 +++++++++++++ 3 files changed, 155 insertions(+), 16 deletions(-) diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 720bd1319b381f..c238ab868af77a 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -107,7 +107,11 @@ const EventEmitter = require('events'); const { StringDecoder } = require('string_decoder'); const { kFSWatchStart, watch } = require('internal/fs/watchers'); const nonNativeWatcher = require('internal/fs/recursive_watch'); -const { isIterable } = require('internal/streams/utils'); +const { + isIterable, + isReadableErrored, + isReadableNodeStream, +} = require('internal/streams/utils'); const assert = require('internal/assert'); const permission = require('internal/process/permission'); @@ -1116,24 +1120,62 @@ function checkAborted(signal) { throw new AbortError(undefined, { cause: signal.reason }); } -async function writeFileHandle(filehandle, data, signal, encoding) { - checkAborted(signal); +function makeWriteFileStreamErrorHandler(data) { + if (!isReadableNodeStream(data) || + typeof data.removeListener !== 'function') { + return undefined; + } + + let error; + let errored = false; + function onError(err) { + error = err; + errored = true; + } + const streamError = isReadableErrored(data); + if (streamError != null) + onError(streamError); + data.on('error', onError); + + return { + __proto__: null, + check() { + if (errored) + throw error; + }, + cleanup() { + data.removeListener('error', onError); + }, + }; +} + +async function writeFileHandle(filehandle, data, signal, encoding, streamErrorHandler) { if (isCustomIterable(data)) { - for await (const buf of data) { + streamErrorHandler ??= makeWriteFileStreamErrorHandler(data); + try { checkAborted(signal); - const toWrite = - isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8'); - let remaining = toWrite.byteLength; - while (remaining > 0) { - const writeSize = MathMin(kWriteFileMaxChunkSize, remaining); - const { bytesWritten } = await write( - filehandle, toWrite, toWrite.byteLength - remaining, writeSize); - remaining -= bytesWritten; + streamErrorHandler?.check(); + for await (const buf of data) { checkAborted(signal); + streamErrorHandler?.check(); + const toWrite = + isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8'); + let remaining = toWrite.byteLength; + while (remaining > 0) { + const writeSize = MathMin(kWriteFileMaxChunkSize, remaining); + const { bytesWritten } = await write( + filehandle, toWrite, toWrite.byteLength - remaining, writeSize); + remaining -= bytesWritten; + checkAborted(signal); + streamErrorHandler?.check(); + } } + } finally { + streamErrorHandler?.cleanup(); } return; } + checkAborted(signal); data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); let remaining = data.byteLength; if (remaining === 0) return; @@ -1891,13 +1933,23 @@ async function writeFile(path, data, options) { } validateAbortSignal(options.signal); + checkAborted(options.signal); + const streamErrorHandler = makeWriteFileStreamErrorHandler(data); + if (path instanceof FileHandle) - return writeFileHandle(path, data, options.signal, options.encoding); + return writeFileHandle( + path, data, options.signal, options.encoding, streamErrorHandler); - checkAborted(options.signal); + let fd; + try { + fd = await open(path, flag, options.mode); + } catch (err) { + streamErrorHandler?.cleanup(); + throw err; + } - const fd = await open(path, flag, options.mode); - let writeOp = writeFileHandle(fd, data, options.signal, options.encoding); + let writeOp = writeFileHandle( + fd, data, options.signal, options.encoding, streamErrorHandler); if (flush) { writeOp = handleFdSync(writeOp, fd); diff --git a/test/parallel/test-fs-promises-file-handle-writeFile.js b/test/parallel/test-fs-promises-file-handle-writeFile.js index 2c1a80e4f52d49..7635ec57567db0 100644 --- a/test/parallel/test-fs-promises-file-handle-writeFile.js +++ b/test/parallel/test-fs-promises-file-handle-writeFile.js @@ -48,6 +48,7 @@ async function doWriteAndCancel() { const dest = path.resolve(tmpDir, 'tmp.txt'); const otherDest = path.resolve(tmpDir, 'tmp-2.txt'); +const errorDest = path.resolve(tmpDir, 'tmp-error.txt'); const stream = Readable.from(['a', 'b', 'c']); const stream2 = Readable.from(['ümlaut', ' ', 'sechzig']); const iterable = { @@ -65,6 +66,15 @@ function iterableWith(value) { } }; } + +function createEarlyErrorStream(error) { + const stream = new Readable({ + read() {} + }); + process.nextTick(() => stream.destroy(error)); + return stream; +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -94,6 +104,25 @@ async function doWriteStream() { } } +async function doWriteStreamError() { + const fileHandle = await open(errorDest, 'w+'); + const error = new Error('early file handle writeFile stream error'); + const stream = createEarlyErrorStream(error); + const uncaughtException = common.mustNotCall( + 'stream errors should reject FileHandle.writeFile()'); + + process.once('uncaughtException', uncaughtException); + try { + await assert.rejects( + fileHandle.writeFile(stream), + { message: error.message } + ); + } finally { + process.removeListener('uncaughtException', uncaughtException); + await fileHandle.close(); + } +} + async function doWriteStreamWithCancel() { const controller = new AbortController(); const { signal } = controller; @@ -190,6 +219,7 @@ async function doWriteInvalidValues() { await validateWriteFile(); await doWriteAndCancel(); await doWriteStream(); + await doWriteStreamError(); await doWriteStreamWithCancel(); await doWriteIterable(); await doWriteInvalidIterable(); diff --git a/test/parallel/test-fs-promises-writefile.js b/test/parallel/test-fs-promises-writefile.js index 25df61b2b48414..b536cf54c04c85 100644 --- a/test/parallel/test-fs-promises-writefile.js +++ b/test/parallel/test-fs-promises-writefile.js @@ -13,6 +13,7 @@ tmpdir.refresh(); const dest = path.resolve(tmpDir, 'tmp.txt'); const otherDest = path.resolve(tmpDir, 'tmp-2.txt'); +const errorDest = path.resolve(tmpDir, 'tmp-error.txt'); const buffer = Buffer.from('abc'.repeat(1000)); const buffer2 = Buffer.from('xyz'.repeat(1000)); const stream = Readable.from(['a', 'b', 'c']); @@ -25,6 +26,16 @@ const iterable = { yield 'c'; } }; +const streamLikeIterable = { + expected: 'abc', + pipe: common.mustNotCall('pipe should not be called for custom iterables'), + on: common.mustNotCall('on should not be called without removeListener'), + *[Symbol.iterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } +}; const veryLargeBuffer = { expected: 'dogs running'.repeat(512 * 1024), @@ -40,6 +51,15 @@ function iterableWith(value) { } }; } + +function createEarlyErrorStream(error) { + const stream = new Readable({ + read() {} + }); + process.nextTick(() => stream.destroy(error)); + return stream; +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -70,6 +90,34 @@ async function doWriteStream() { assert.deepStrictEqual(data, expected); } +async function doWriteStreamError() { + const error = new Error('early writeFile stream error'); + const stream = createEarlyErrorStream(error); + const uncaughtException = common.mustNotCall( + 'stream errors should reject writeFile()'); + + process.once('uncaughtException', uncaughtException); + try { + await assert.rejects( + fsPromises.writeFile(errorDest, stream), + { message: error.message } + ); + assert.strictEqual(stream.listenerCount('error'), 0); + } finally { + process.removeListener('uncaughtException', uncaughtException); + } +} + +async function doWriteStreamOpenError() { + const stream = Readable.from(['a']); + + await assert.rejects( + fsPromises.writeFile(path.resolve(tmpDir, 'not-found', 'tmp.txt'), stream), + { code: 'ENOENT' } + ); + assert.strictEqual(stream.listenerCount('error'), 0); +} + async function doWriteStreamWithCancel() { const controller = new AbortController(); const { signal } = controller; @@ -86,6 +134,12 @@ async function doWriteIterable() { assert.deepStrictEqual(data, iterable.expected); } +async function doWriteStreamLikeIterable() { + await fsPromises.writeFile(dest, streamLikeIterable); + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, streamLikeIterable.expected); +} + async function doWriteInvalidIterable() { await Promise.all( [42, 42n, {}, Symbol('42'), true, undefined, null, NaN].map((value) => @@ -168,8 +222,11 @@ async function doReadWithEncoding() { await doRead(); await doReadWithEncoding(); await doWriteStream(); + await doWriteStreamError(); + await doWriteStreamOpenError(); await doWriteStreamWithCancel(); await doWriteIterable(); + await doWriteStreamLikeIterable(); await doWriteInvalidIterable(); await doWriteIterableWithEncoding(); await doWriteBufferIterable(); From 614394afeebc6e6b4237842bbd8f78fe1883206c Mon Sep 17 00:00:00 2001 From: cookesan <6601329+cookesan@users.noreply.github.com> Date: Sat, 23 May 2026 21:24:29 -0400 Subject: [PATCH 2/3] test: cover already errored writeFile streams Signed-off-by: cookesan <6601329+cookesan@users.noreply.github.com> --- .../test-fs-promises-file-handle-writeFile.js | 30 +++++++++++++++++++ test/parallel/test-fs-promises-writefile.js | 27 +++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/test/parallel/test-fs-promises-file-handle-writeFile.js b/test/parallel/test-fs-promises-file-handle-writeFile.js index 7635ec57567db0..74a597c4377309 100644 --- a/test/parallel/test-fs-promises-file-handle-writeFile.js +++ b/test/parallel/test-fs-promises-file-handle-writeFile.js @@ -75,6 +75,14 @@ function createEarlyErrorStream(error) { return stream; } +function createErroredStream(error) { + const stream = new Readable({ + read() {} + }); + stream.destroy(error); + return stream; +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -117,6 +125,27 @@ async function doWriteStreamError() { fileHandle.writeFile(stream), { message: error.message } ); + assert.strictEqual(stream.listenerCount('error'), 0); + } finally { + process.removeListener('uncaughtException', uncaughtException); + await fileHandle.close(); + } +} + +async function doWriteAlreadyErroredStream() { + const fileHandle = await open(errorDest, 'w+'); + const error = new Error('already errored file handle writeFile stream'); + const stream = createErroredStream(error); + const uncaughtException = common.mustNotCall( + 'already errored streams should reject FileHandle.writeFile()'); + + process.once('uncaughtException', uncaughtException); + try { + await assert.rejects( + fileHandle.writeFile(stream), + { message: error.message } + ); + assert.strictEqual(stream.listenerCount('error'), 0); } finally { process.removeListener('uncaughtException', uncaughtException); await fileHandle.close(); @@ -220,6 +249,7 @@ async function doWriteInvalidValues() { await doWriteAndCancel(); await doWriteStream(); await doWriteStreamError(); + await doWriteAlreadyErroredStream(); await doWriteStreamWithCancel(); await doWriteIterable(); await doWriteInvalidIterable(); diff --git a/test/parallel/test-fs-promises-writefile.js b/test/parallel/test-fs-promises-writefile.js index b536cf54c04c85..0779155c134360 100644 --- a/test/parallel/test-fs-promises-writefile.js +++ b/test/parallel/test-fs-promises-writefile.js @@ -60,6 +60,14 @@ function createEarlyErrorStream(error) { return stream; } +function createErroredStream(error) { + const stream = new Readable({ + read() {} + }); + stream.destroy(error); + return stream; +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -108,6 +116,24 @@ async function doWriteStreamError() { } } +async function doWriteAlreadyErroredStream() { + const error = new Error('already errored writeFile stream'); + const stream = createErroredStream(error); + const uncaughtException = common.mustNotCall( + 'already errored streams should reject writeFile()'); + + process.once('uncaughtException', uncaughtException); + try { + await assert.rejects( + fsPromises.writeFile(errorDest, stream), + { message: error.message } + ); + assert.strictEqual(stream.listenerCount('error'), 0); + } finally { + process.removeListener('uncaughtException', uncaughtException); + } +} + async function doWriteStreamOpenError() { const stream = Readable.from(['a']); @@ -223,6 +249,7 @@ async function doReadWithEncoding() { await doReadWithEncoding(); await doWriteStream(); await doWriteStreamError(); + await doWriteAlreadyErroredStream(); await doWriteStreamOpenError(); await doWriteStreamWithCancel(); await doWriteIterable(); From 2af6b28d7e1a7aa11e99d7feeb6cbeac618b2cea Mon Sep 17 00:00:00 2001 From: cookesan <6601329+cookesan@users.noreply.github.com> Date: Sun, 24 May 2026 00:56:41 -0400 Subject: [PATCH 3/3] fs: handle already errored writeFile streams Keep the temporary writeFile error listener through the pending next-tick error emission when a stream is already errored at entry. This lets writeFile reject from the stored stream error without letting the stream emit an unhandled error, and still removes the listener after the pending emission. Signed-off-by: cookesan <6601329+cookesan@users.noreply.github.com> --- lib/internal/fs/promises.js | 7 ++++++- test/parallel/test-fs-promises-file-handle-writeFile.js | 9 ++++++++- test/parallel/test-fs-promises-writefile.js | 5 +++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index c238ab868af77a..4d5be01e65d9ed 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -1133,6 +1133,7 @@ function makeWriteFileStreamErrorHandler(data) { errored = true; } const streamError = isReadableErrored(data); + const wasErrored = streamError != null; if (streamError != null) onError(streamError); data.on('error', onError); @@ -1144,7 +1145,11 @@ function makeWriteFileStreamErrorHandler(data) { throw error; }, cleanup() { - data.removeListener('error', onError); + if (wasErrored) { + process.nextTick(() => data.removeListener('error', onError)); + } else { + data.removeListener('error', onError); + } }, }; } diff --git a/test/parallel/test-fs-promises-file-handle-writeFile.js b/test/parallel/test-fs-promises-file-handle-writeFile.js index 74a597c4377309..a657f49e4616af 100644 --- a/test/parallel/test-fs-promises-file-handle-writeFile.js +++ b/test/parallel/test-fs-promises-file-handle-writeFile.js @@ -83,6 +83,10 @@ function createErroredStream(error) { return stream; } +function waitForNextTick() { + return new Promise((resolve) => process.nextTick(resolve)); +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -125,7 +129,9 @@ async function doWriteStreamError() { fileHandle.writeFile(stream), { message: error.message } ); - assert.strictEqual(stream.listenerCount('error'), 0); + // FileHandle.writeFile() starts iteration before the next-tick error, + // so the stream async iterator retains its own error listener. + assert.strictEqual(stream.listenerCount('error'), 1); } finally { process.removeListener('uncaughtException', uncaughtException); await fileHandle.close(); @@ -145,6 +151,7 @@ async function doWriteAlreadyErroredStream() { fileHandle.writeFile(stream), { message: error.message } ); + await waitForNextTick(); assert.strictEqual(stream.listenerCount('error'), 0); } finally { process.removeListener('uncaughtException', uncaughtException); diff --git a/test/parallel/test-fs-promises-writefile.js b/test/parallel/test-fs-promises-writefile.js index 0779155c134360..812522a8b5636e 100644 --- a/test/parallel/test-fs-promises-writefile.js +++ b/test/parallel/test-fs-promises-writefile.js @@ -68,6 +68,10 @@ function createErroredStream(error) { return stream; } +function waitForNextTick() { + return new Promise((resolve) => process.nextTick(resolve)); +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -128,6 +132,7 @@ async function doWriteAlreadyErroredStream() { fsPromises.writeFile(errorDest, stream), { message: error.message } ); + await waitForNextTick(); assert.strictEqual(stream.listenerCount('error'), 0); } finally { process.removeListener('uncaughtException', uncaughtException);