Skip to content

文件流

流是指数据的流动,数据从一个地方缓缓的流动到另一个地方。

流是有方向的:

  • 可读流 Readable:数据从源头流向内存
  • 可写流 Writeable:数据从内存流向源头
  • 双工流 Duplex:数据既可从源头流向内存,又可从内存流向源头

为什么需要流?

  • 其他的介质和内存的数据存储量不一致
    • 在磁盘中可以存储大量的数据,而内存能存储的数据量有限,如果想要读取磁盘的数据,无法一次性将磁盘的数据读取出来,因为内存的数据存储量有限,只能一点一点读取

image-20220618153935513

  • 其他介质和内存的的数据处理能力不一致
    • 内存的数据处理能力极强,而磁盘的数据处理能力较弱,如果内存一次性将大量数据传递给磁盘,磁盘会处理不过来,因此问了避免这种问题,同样需要一点一点传递

image-20220618154000597

什么是文件流?

文件流就是指内存数据和磁盘文件数据之间的流动。

可读流

js
const fs = require('fs');
const path = require('path');

const filename = path.resolve(__dirname, './test.txt');
const rs = fs.createReadStream(filename, {
    encoding: 'utf-8', // 读取流数据时的编码方式
    highWaterMark:, 1 // 每次读取数据的大小,单位字节,默认读取64 * 1024字节,即64kb
    autoClose: true // 是否自动关闭,默认为true
});

// 文件打开事件
rs.on('open', () => {
    console.log('文件被打开了');
});

rs.on('error', () => {
    console.log('出错了');
});

// 读取数据事件,只有注册该事件时,才会开始读取数据,每次读取highWaterMark指定的大小
rs.on('data', chunk => {
    console.log('读取到一部分数据:', chunk);
    rs.pause(); // 暂停读取,触发pause事件
});


rs.on('pause', () => {
    console.log('数据暂停读取了');
    setTimeout(() => {
        rs.resume(); // 恢复读取,触发resume事件
    }, 1000);
});

rs.on('resume', () => {
    console.log('数据恢复读取了');
});

rs.on('end', () => {
    console.log('数据读取完毕了');
});

rs.on('close', () => {
    console.log('文件关闭了');
});

可写流

js
const fs = require('fs');
const path = require('path');

const filename = path.resolve(__dirname, './writefile/abc.txt');
const ws = fs.createWriteStream(filename, {
    encoding: 'utf-8',
    highWaterMark: 3,
    autoClose: true
});

ws.write(data)

ws.write(data)方法能够将数据写入文件,data可以是字符串或Buffer,我们可以将数据写入文件的过程想象成一个通道,由于磁盘的数据处理能力低,因此磁盘一次写入的数据有限(每次写入的数据大小由highWaterMark决定),当有其他的数据需要写入时,需要在写入队列中等待,而写入队列是在内存中的,内存的资源有限,因此这样容易产生一个叫做背压的问题。

js
// 一个中文占3个字节
const flag = ws.write('a');
console.log(flag); // true
console.log(ws.write('a')); // true
console.log(ws.write('a')); // false
console.log(ws.write('a')); // false
console.log(ws.write('a')); // false

ws.write(data)会返回一个布尔值,其含义分别是:

  • true:表示写入通道没有被填满,接下来的数据可以直接写入,无须排队

image-20220619204529345

  • false:表示写入通道目前已被填满,接下来的数据将进入写入队列

image-20220619204541161

那么要如何解决背压问题呢?这里通过一个案例来描述一下:

假如我们现在需要写入1M大小的字符串a,代码如下:

js
for(let i = 0; i < 1024 * 1024; i++) {
    ws.write('a');
}

上面的写法就产生了背压问题,导致内存资源消耗过大。如果我们通过ws.write(data)的返回值来判断的话,可以这样写:

js
let i = 0;
let flag = true;
while(i < 1024 * 1024 && flag) {
    flag = ws.write('a');
    i++;
}

上面的代码看似解决了问题,但是我们看看写入的文件可以发现,最终只写入了3个a。原因是在写入第三个a时,返回的flag已经是false,这就导致了后续无法进入while循环,代码运行中止了。

这个时候就需要借助drain事件,这个事件会在通道清空时触发。我们只要在通道清空时再次执行写入操作,就能够解决背压问题。

js
let i = 0;
function write() {
	let flag = true;
	while(i < 1024 * 1024 && flag) {
    	flag = ws.write('a');
    	i++;
	}
}
write();
ws.on('drain', () => {
    console.log();
    write();
})

ws.end([data])

结束写入,关闭文件。

js
ws.end(); // 结束写入
ws.end('这是结束写入前的最后一次写入');

案例

假如我们需要通过读取写入的方式将一个1M的文件abc.txt复制到一个新的文件abc2.txt,有两种方式可以实现:

js
const fs = require('fs');
const path = require('path');

const from = path.resolve(__dirname, './abc.txt');
const to = path.resolve(__dirname, './abc2.txt');

// 方式1
async function method1() {
    console.time('方式1');
    const content = await fs.promises.readFile(from);
    await fs.promises.writeFile(to);
    console.timeEnd('方式1');
    console.log('复制完成');
}
method1();

// 方式2
function method2() {
    console.time('方式2');
    const rs = fs.createReadStream(from);
    const ws = fs.createWriteStream(to);
    rs.on('data', chunk => { // 读取一部分数据
		const flag = ws.write(chunk); // 将数据写入文件
        if (!flag) {
            // 通道已满,表示下一次写入,会造成背压
            rs.pause(); // 暂停写入
        }
    });
    ws.on('drain', () => { // 通道清空时
        rs.resume(); // 恢复读取数据
    });
    rs.on('close', () => {
        console.timeEnd('方式2');
        console.log('复制完成');
    });
}
method2();

将上面两种方式的代码分别执行,可以看出,方式2的代码执行效率要比方式1高,并且方式2占用的内存比方式1少,因为方式1是将内存一次性全部读取,当读取的文件越大,内存占用率就越高。

可写流中提供了一个pipe方法能够帮助我们实现方式2中的代码:

js
function method3() {
    console.time('方式3');
    const rs = fs.createReadStream(from);
    const ws = fs.createWriteStream(to);
    rs.pipe(ws); // 将可读流连接到可写流
    rs.on('close', () => {
        console.timeEnd('方式3');
        console.log('复制完成');
    });
}
method3();