aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Schauer <matthew.schauer@e10x.net>2021-04-09 15:46:10 -0700
committerMatthew Schauer <matthew.schauer@e10x.net>2021-04-09 15:46:10 -0700
commit6364ae1642dfa4e607daf9567d1e02a873848ed7 (patch)
tree95d8d7fc9b16c132b369882425287ec2e17e724f
parent608a26a4f1d9aa880e44c7d56a8f928f53e24b2b (diff)
Pool data readers to significantly increase performance
-rw-r--r--src/read.rs83
1 files changed, 71 insertions, 12 deletions
diff --git a/src/read.rs b/src/read.rs
index 6b2c6fb..1c6ad41 100644
--- a/src/read.rs
+++ b/src/read.rs
@@ -21,6 +21,8 @@ use std::collections::{HashMap, HashSet};
use std::ffi::{CStr, CString};
use std::io;
use std::io::{Read, Seek};
+use std::mem::ManuallyDrop;
+use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf, Component};
use std::sync::{Arc, Mutex};
use super::*;
@@ -51,6 +53,38 @@ fn enoent_ok<T>(t: Result<T>) -> Result<Option<T>> {
}
}
+// Wrapper for leasing objects from a pool
+struct Leased<'a, T> {
+ pool: &'a Mutex<Vec<T>>,
+ data: ManuallyDrop<T>,
+}
+
+impl<'a, T> Leased<'a, T> {
+ pub fn new(pool: &'a Mutex<Vec<T>>, data: T) -> Self {
+ Self { pool: pool, data: ManuallyDrop::new(data) }
+ }
+}
+
+impl<'a, T> Deref for Leased<'a, T> {
+ type Target = T;
+
+ fn deref(&self) -> &Self::Target {
+ &self.data
+ }
+}
+
+impl<'a, T> DerefMut for Leased<'a, T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.data
+ }
+}
+
+impl<'a, T> Drop for Leased<'a, T> {
+ fn drop(&mut self) {
+ unsafe { self.pool.lock().expect(LOCK_ERR).push(ManuallyDrop::take(&mut self.data)); }
+ }
+}
+
/// A directory in the archive.
///
/// Directory objects are obtained by calling the [`data`](Node::data) or [`as_dir`](Node::as_dir)
@@ -127,6 +161,29 @@ impl<'a> std::iter::Iterator for Dir<'a> {
}
}
+struct DataReader {
+ #[allow(dead_code)] compressor: ManagedPointer<sqfs_compressor_t>, // Referenced by `reader`
+ reader: ManagedPointer<sqfs_data_reader_t>,
+}
+
+impl<'a> DataReader {
+ fn new(archive: &'a Archive) -> Result<Self> {
+ let compressor = archive.compressor()?;
+ let reader = sfs_init_check_null(&|| unsafe {
+ sqfs_data_reader_create(*archive.file, archive.superblock.block_size as u64, *compressor, 0)
+ }, "Couldn't create data reader", sfs_destroy)?;
+ unsafe { sfs_check(sqfs_data_reader_load_fragment_table(*reader, &archive.superblock), "Couldn't load fragment table")? };
+ Ok(Self { compressor: compressor, reader: reader })
+ }
+
+ fn read(&self, inode: &ManagedPointer<sqfs_inode_generic_t>, offset: u64, buf: &mut [u8]) -> io::Result<u64> {
+ Ok(unsafe { sfs_check(
+ sqfs_data_reader_read(*self.reader, inode.as_const(), offset, buf.as_mut_ptr() as *mut libc::c_void, buf.len() as u32),
+ "Couldn't read file content"
+ ).map_err(|e| io::Error::new(io::ErrorKind::Other, e))? } as u64)
+ }
+}
+
/// A file in the archive.
///
/// `File` objects allow standard operations on file inodes in an archive. `File` implements
@@ -145,19 +202,12 @@ impl<'a> std::iter::Iterator for Dir<'a> {
/// file.read(&mut buf)?;
pub struct File<'a> {
node: &'a Node<'a>,
- #[allow(dead_code)] compressor: ManagedPointer<sqfs_compressor_t>, // Referenced by `reader`
- reader: Mutex<ManagedPointer<sqfs_data_reader_t>>,
offset: Mutex<u64>,
}
impl<'a> File<'a> {
fn new(node: &'a Node) -> Result<Self> {
- let compressor = node.container.compressor()?;
- let reader = sfs_init_check_null(&|| unsafe {
- sqfs_data_reader_create(*node.container.file, node.container.superblock.block_size as u64, *compressor, 0)
- }, "Couldn't create data reader", sfs_destroy)?;
- unsafe { sfs_check(sqfs_data_reader_load_fragment_table(*reader, &node.container.superblock), "Couldn't load fragment table")? };
- Ok(Self { node: node, compressor: compressor, reader: Mutex::new(reader), offset: Mutex::new(0) })
+ Ok(Self { node: node, offset: Mutex::new(0) })
}
/// Retrieve the size of the file in bytes.
@@ -218,9 +268,8 @@ impl<'a> Read for File<'a> {
let mut locked_offset = self.offset.lock().expect(LOCK_ERR);
if *locked_offset >= self.size() { Ok(0) }
else {
- let locked_reader = self.reader.lock().expect(LOCK_ERR);
- let res = unsafe { sfs_check(sqfs_data_reader_read(**locked_reader, self.node.inode.as_const(), *locked_offset, buf.as_mut_ptr() as *mut libc::c_void, buf.len() as u32), "Couldn't read file content").map_err(|e| io::Error::new(io::ErrorKind::Other, e))? };
- *locked_offset += res as u64;
+ let res = self.node.container.data_reader().unwrap().read(&self.node.inode, *locked_offset, buf)?;
+ *locked_offset += res;
Ok(res as usize)
}
}
@@ -693,6 +742,7 @@ pub struct Archive {
superblock: sqfs_super_t,
compressor_config: sqfs_compressor_config_t,
mmap: (std::fs::File, Mmap),
+ data_readers: Mutex<Vec<DataReader>>,
}
impl Archive {
@@ -711,7 +761,7 @@ impl Archive {
let os_file = std::fs::File::open(&path)?;
let map = unsafe { MmapOptions::new().map(&os_file).map_err(|e| SquashfsError::Mmap(e))? };
//let map = MemoryMap::new(superblock.bytes_used as usize, &vec![MapOption::MapReadable, MapOption::MapFd(os_file.as_raw_fd())])?;
- Ok(Self { path: path.as_ref().to_path_buf(), file: file, superblock: superblock, compressor_config: compressor_config, mmap: (os_file, map) })
+ Ok(Self { path: path.as_ref().to_path_buf(), file: file, superblock: superblock, compressor_config: compressor_config, mmap: (os_file, map), data_readers: Mutex::new(vec![]) })
}
fn compressor(&self) -> Result<ManagedPointer<sqfs_compressor_t>> {
@@ -727,6 +777,15 @@ impl Archive {
}, "Couldn't create metadata reader", sfs_destroy)?)
}
+ fn data_reader(&self) -> Result<Leased<DataReader>> {
+ let mut locked_readers = self.data_readers.lock().expect(LOCK_ERR);
+ let ret = match locked_readers.pop() {
+ Some(reader) => reader,
+ None => { println!("Made data reader"); DataReader::new(&self)? },
+ };
+ Ok(Leased::new(&self.data_readers, ret))
+ }
+
fn id_lookup(&self, idx: u16) -> Result<u32> {
let id_table = sfs_init_check_null(&|| unsafe {
sqfs_id_table_create(0)