In order to mitigate against the brute force attacks against Gitlab accounts, we are moving to all edu-ID Logins. We would like to remind you to link your account with your edu-id. Login will be possible only by edu-ID after November 30, 2021. Here you can find the instructions for linking your account.

If you don't have a SWITCH edu-ID, you can create one with this guide here

kind regards

This Server has been upgraded to GitLab release 14.2.6

Unverified Commit 82ee16c5 authored by Sebastian Schüpbach's avatar Sebastian Schüpbach
Browse files

full refactoring

parent 0b64f75a
......@@ -3,6 +3,12 @@ name = "media-file-distributor"
version = "0.1.0"
authors = ["Sebastian Schüpbach <post@annotat.net>"]
edition = "2018"
description = "Web service providing access to the media files uploaded by the Memoriav's partner institutions"
readme = "README.md"
homepage = "https://gitlab.switch.ch/memoriav/memobase-2020/services/import-process/media-file-distributor"
repository = "https://gitlab.switch.ch/memoriav/memobase-2020/services/import-process/media-file-distributor"
license-file = "LICENSE"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
......
......@@ -7,6 +7,8 @@ At the moment the following endpoints are supported:
* `/media/<record-id>`: Fetch a media file with the respective id. The id consists of the recordSetId (a three-letter code and a three-digit sequential number) as well as the proper id of the record. E.g. `baz-001-MEI_67473`.
* `/thumbnail/<record-id>`: The same for thumbnails (media files in the `thumbnails` directory)
* `/refresh`: Refresh the file cache. This happens also automatically after a predefined duration (see below).
* `/summary/<collection-id>`: Gives a quantitative summary of the collection
* `/collections`: Shows the IDs of all available collections
## Installation
......
host = "0.0.0.0:3000" # Host name and port of the running application
base_path = "/swissbib_index/mb_sftp" # Root of the sFTP directory
refresh_after_sec = 3600 # Duration in seconds after which the cache is refreshed (i.e. the whole file tree is reread)
\ No newline at end of file
cache_timeout = 3600 # Duration in seconds after which the cache is refreshed (i.e. the whole file tree is reread)
partial_cache_timeout = 5 # Duration in seconds after which the cache for a specific collection is refreshed
\ No newline at end of file
......@@ -16,6 +16,8 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
//! A general comment
mod media_folder_utils;
mod service;
......@@ -54,24 +56,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut media_cache = MediaFileCache::new(config.base_path.clone());
media_cache.refresh().context("Refreshing cache failed!")?;
info!(
"Done reading in media files. Found {} media files and {} thumbnails",
"Done reading in media files. Found {} metadata files, {} media files and {} thumbnails",
media_cache.metadata_file_size(),
media_cache.dissemination_copies_size(),
media_cache.thumbnails_size()
);
let media_cache = Arc::new(Mutex::new(media_cache));
let refresh_period = config.refresh_after_sec.unwrap_or(7200);
let outdated_after = Duration::new(refresh_period, 0);
info!("Setting refresh period to {}s", refresh_period);
let cache_timeout = Duration::new(config.cache_timeout.unwrap_or(7200), 0);
let partial_cache_timeout = Duration::new(config.partial_cache_timeout.unwrap_or(60), 0);
info!("Setting refresh period to {}s", cache_timeout.as_secs());
info!(
"Setting partial refresh period to {}s",
partial_cache_timeout.as_secs()
);
let server = Server::bind(&addr).serve(make_service_fn(move |_| {
let base_dir_cloned = config.base_path.clone();
let media_cache_cloned = media_cache.clone();
let cloned_base_dir = config.base_path.clone();
let cloned_media_cache = media_cache.clone();
async move {
Ok::<_, Error>(Svc {
media_cache: media_cache_cloned,
base_dir: base_dir_cloned,
outdated_after,
media_cache: cloned_media_cache,
base_dir: cloned_base_dir,
cache_timeout,
partial_cache_timeout,
})
}
}));
......@@ -82,6 +90,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Ok(())
}
/// Parses a provided configuration file
pub fn parse_config_file(path: &str) -> Result<Config> {
let mut file = File::open(path)?;
let mut contents = String::new();
......@@ -89,9 +98,15 @@ pub fn parse_config_file(path: &str) -> Result<Config> {
toml::from_str(&contents).context("Parsing of TOML config file failed")
}
/// Internal configuration representation
#[derive(Deserialize)]
pub struct Config {
/// host:port tuple of webserver
pub host: String,
/// Path to root directory of media file tree
pub base_path: String,
pub refresh_after_sec: Option<u64>,
/// Timeout (in seconds) after which a full cache update is enforced
pub cache_timeout: Option<u64>,
/// Timeout (in seconds) after which a collection cache is updated if a query for a certain ID failed
pub partial_cache_timeout: Option<u64>,
}
......@@ -16,161 +16,312 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use anyhow::anyhow;
use anyhow::{Context, Result};
use log::{debug, warn};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
/// Caches the media files found in the indicated directory tree.
/// Represents a file path cache for a single collection (i.e. "record set")
#[derive(Debug)]
pub struct MediaFileCache {
pub struct Collection {
/// Index of dissemination copies ("digital objects") in collection
dissemination_copies: HashMap<String, String>,
/// Index of thumbnail files in collection
thumbnails: HashMap<String, String>,
metadata_file_count: HashMap<String, usize>,
/// Number of metadata files in collection
metadata_file_count: usize,
/// File path to collection
base_path: String,
/// Creation instance of collection. Set to [`std::time::Instant::now()`] when collection has been updated.
created_on: Instant,
}
impl MediaFileCache {
/// Create a new instance
impl Collection {
/// Creates a new collection
pub fn new(base_path: String) -> Self {
MediaFileCache {
dissemination_copies: HashMap::new(),
thumbnails: HashMap::new(),
metadata_file_count: HashMap::new(),
let base = Path::new(&base_path);
let parse = |r: &str| {
let path = base.join(r);
let path = path.as_path();
let path_as_str = path.to_str().unwrap_or_else(|| {
warn!("Can't parse path");
"<unknown>"
});
if let false = path.exists() {
debug!("Directory does not exist: {}", path_as_str);
HashMap::new()
} else if let Ok(index) = Collection::scan_media_folder(path) {
debug!(
"{} {} files parsed in directory {}",
index.len(),
r,
path_as_str
);
index
} else {
warn!("Indexing of files in path {} failed", path_as_str);
HashMap::new()
}
};
Collection {
dissemination_copies: parse("media"),
thumbnails: parse("thumbnails"),
metadata_file_count: Collection::count_metadata_files(base).unwrap_or_else(|_| {
warn!("No metadata files found in {}", &base_path);
0
}),
base_path,
created_on: Instant::now(),
}
}
/// Extracts file ID from file path
fn create_id(path: &PathBuf) -> Result<(String, String)> {
let record_id = path
.file_stem()
.with_context(|| {
warn!("Can't extract file stem");
"Can't extract file stem"
})?
.to_str()
.with_context(|| {
warn!("Can't convert file stem to str");
"Can't convert file stem to str"
})?;
let path = path.to_str().with_context(|| {
warn!("Can't convert path to str");
"Can't read path!"
})?;
Ok((record_id.replace(" ", "_"), path.to_owned()))
}
/// Counts number of metadata files in collection
fn count_metadata_files(dir: &Path) -> Result<usize> {
let mut file_counter: usize = 0;
for entry in fs::read_dir(&dir)? {
let entry = entry?;
let path = entry.path();
let extension = path.extension().and_then(|e| e.to_str());
if extension.is_some() && extension.unwrap() == "xml" {
file_counter += 1;
}
}
Ok(file_counter)
}
/// Scans directory for media files and returns an index of found files
fn scan_media_folder(dir: &Path) -> Result<HashMap<String, String>> {
let mut file_index: HashMap<String, String> = HashMap::new();
for entry in fs::read_dir(&dir).context("Can't read dir")? {
let entry = entry?;
let path = entry.path();
if !path.is_dir() {
let (key, value) = Collection::create_id(&path).context("Couldn't index file")?;
file_index.insert(key, value);
}
}
Ok(file_index)
}
/// Returns the file path for a certain ID - media type combination, where the latter is either `media` or `thumbnail`
pub fn get_file_path(&self, record_id: &str, media_type: &str) -> Option<String> {
if media_type == "media" {
self.dissemination_copies
.get(record_id)
.map(|id| id.to_owned())
} else if media_type == "thumbnail" {
self.thumbnails.get(record_id).map(|id| id.to_owned())
} else {
None
}
}
/// Updates the media index for a certain directory
pub fn update_media_index(&mut self, dir: &Path) -> Result<()> {
if dir.ends_with("media") {
self.dissemination_copies =
Collection::scan_media_folder(dir).context("Couldn't scan folder media")?;
} else if dir.ends_with("thumbnails") {
self.thumbnails =
Collection::scan_media_folder(dir).context("Couldn't scan folder thumbnails")?;
}
self.created_on = Instant::now();
Ok(())
}
/// Checks if the cache is outdated according to `outdated_after`
pub fn is_outdated(&self, outdated_after: &Duration) -> bool {
let now = Instant::now();
now.duration_since(self.created_on) >= *outdated_after
}
/// Size of mapping for dissemination copies (i.e. the total of cached dissemination copies)
/// Returns index size for dissemination copies (i.e. the total of cached dissemination copies)
pub fn dissemination_copies_size(&self) -> usize {
self.dissemination_copies.len()
}
/// Count media files for a collection
pub fn collection_dissemination_copies_size(&self, collection_id: &str) -> usize {
self.dissemination_copies.keys().filter(|k| k.starts_with(collection_id)).count()
}
/// Size of mapping for thumbnails (i.e. the total of cached thumbnails)
/// Returns index size for thumbnails (i.e. the total of cached thumbnails)
pub fn thumbnails_size(&self) -> usize {
self.thumbnails.len()
}
pub fn collection_thumbnails_size(&self, collection_id: &str) -> usize {
self.thumbnails.keys().filter(|k| k.starts_with(collection_id)).count()
/// Retuns number of metadata files in collection
pub fn metadata_files(&self) -> usize {
self.metadata_file_count
}
}
/// Count metadata files for a collection
pub fn collection_metadata_file_size(&self, collection_id: &str) -> Option<usize> {
self.metadata_file_count.get(collection_id).map(|c| c.to_owned())
/// Caches all collections found in the indicated directory tree.
#[derive(Debug)]
pub struct MediaFileCache {
/// Collection index
collections: HashMap<String, Collection>,
/// Path to root directory
base_path: String,
/// Creation instance of cache. Set to [`std::time::Instant::now()`] when cache has been updated
created_on: Instant,
}
impl MediaFileCache {
/// Creates a new instance
pub fn new(base_path: String) -> Self {
let collections =
if let Ok(index) = MediaFileCache::scan_collection_folders(Path::new(&base_path)) {
index
} else {
warn!("Collection indexing failed!");
HashMap::new()
};
MediaFileCache {
collections,
base_path,
created_on: Instant::now(),
}
}
pub fn get_collections(&self) -> Vec<String> {
self.metadata_file_count.keys().map(|k| k.to_owned()).collect::<Vec<String>>()
/// Scans the root path for collections and returns an index of collection IDs and their files
fn scan_collection_folders(dir: &Path) -> Result<HashMap<String, Collection>> {
let mut file_index: HashMap<String, Collection> = HashMap::new();
for entry in fs::read_dir(&dir).context("Can't read dir")? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
let path_as_string = path.to_str().unwrap().to_owned();
let dir_name = path
.file_name()
.context("Can't extract file name")?
.to_str()
.context("Can't convert OsStr to str")?
.to_owned();
file_index.insert(dir_name, Collection::new(path_as_string));
}
}
Ok(file_index)
}
/// Merges cache with `other`
pub fn merge(&mut self, other: MediaFileCache) {
self.dissemination_copies.extend(other.dissemination_copies);
self.thumbnails.extend(other.thumbnails);
self.metadata_file_count.extend(other.metadata_file_count);
self.created_on = Instant::now();
/// Checks if the cache is outdated according to `outdated_after`
pub fn is_outdated(&self, outdated_after: &Duration) -> bool {
let now = Instant::now();
now.duration_since(self.created_on) >= *outdated_after
}
/// Adds a single file to the cache
pub fn add_file(&mut self, path: &PathBuf) -> Result<()> {
let id = path
.file_stem()
.context("Can't extract file stem")?
.to_str()
.context("Can't convert file stem to str")?;
let extension = path
.extension()
.context("Can't extract extension")?
.to_str()
.context("Can't convert extension to str")?;
let path = path.to_str().context("Can't read path!")?;
let path_elements = path.split('/').collect::<Vec<&str>>();
let path_size = path_elements.len();
if path_elements
.get(path_size - 2)
.context("Path is invalid")?
== &"media"
{
let collection_id = *path_elements
.get(path_size - 3)
.context("Path is invalid")?;
self.dissemination_copies
.insert(format!("{}-{}", collection_id, id.replace(" ", "_")), path.to_owned());
} else if path_elements
.get(path_size - 2)
.context("Path is invalid")?
== &"thumbnails"
{
let collection_id = *path_elements
.get(path_size - 3)
.context("Path is invalid")?;
self.thumbnails
.insert(format!("{}-{}", collection_id, id.replace(" ", "_")), path.to_owned());
} else if extension == "xml" {
let record_set_id = *path_elements.get(path_size - 2)
.context("Path is invalid")?;
*self.metadata_file_count.entry(record_set_id.to_owned()).or_insert(1) += 1;
/// Checks if cache for a certain collection is outdated
pub fn collection_is_outdated(&self, collection_id: &str, outdated_after: &Duration) -> bool {
if let Some(c) = self.collections.get(collection_id) {
c.is_outdated(outdated_after)
} else {
false
}
Ok(())
}
/// Gets file path for id
pub fn get_file_path(&self, id: &str, media_type: &str) -> Option<String> {
if media_type == "media" {
self.dissemination_copies
.get(id)
.map(|entry| entry.to_owned())
} else if media_type == "thumbnail" {
self.thumbnails.get(id).map(|entry| entry.to_owned())
} else {
None
/// Returns number of all contained dissemination copies
pub fn dissemination_copies_size(&self) -> usize {
let mut counter: usize = 0;
for collection in self.collections.values() {
counter += collection.dissemination_copies_size();
}
counter
}
/// Reloads the cache
/// Returns number of dissemination copies in a certain collection
pub fn collection_dissemination_copies_size(&self, collection_id: &str) -> usize {
match self.collections.get(collection_id) {
Some(c) => c.dissemination_copies_size(),
None => 0,
}
}
/// Returns number of all contained thumbnails
pub fn thumbnails_size(&self) -> usize {
let mut counter: usize = 0;
for collection in self.collections.values() {
counter += collection.thumbnails_size();
}
counter
}
/// Returns number of thumbnails in a certain collection
pub fn collection_thumbnails_size(&self, collection_id: &str) -> usize {
match self.collections.get(collection_id) {
Some(c) => c.thumbnails_size(),
None => 0,
}
}
/// Returns number of all contained metadata files
pub fn metadata_file_size(&self) -> usize {
let mut counter: usize = 0;
for collection in self.collections.values() {
counter += collection.metadata_files();
}
counter
}
/// Returns number of metadata files in a certain collection
pub fn collection_metadata_file_size(&self, collection_id: &str) -> Option<usize> {
self.collections
.get(collection_id)
.map(|c| c.metadata_files())
}
/// Returns a list of all collection IDs
pub fn get_collections(&self) -> Vec<String> {
self.collections
.keys()
.map(|k| k.to_owned())
.collect::<Vec<String>>()
}
/// Returns file path for an ID and a media type, where latter can be either `media` or `thumbnail`
pub fn get(&self, id: &str, media_type: &str) -> Option<String> {
let collection_id = id.split_at(7).0;
self.collections
.get(collection_id)
.and_then(|c| c.get_file_path(id.split_at(8).1, media_type))
}
/// Refreshes the entire cache
pub fn refresh(&mut self) -> Result<()> {
let temp_cache =
visit_dirs(Path::new(&self.base_path)).context("Coulnd't scan for files")?;
self.dissemination_copies = temp_cache.dissemination_copies;
self.thumbnails = temp_cache.thumbnails;
self.metadata_file_count = temp_cache.metadata_file_count;
self.collections = MediaFileCache::scan_collection_folders(Path::new(&self.base_path))
.context("Cache refreshing failed")?;
self.created_on = Instant::now();
Ok(())
}
}
/// Scans media file directory recursively
fn visit_dirs(dir: &Path) -> Result<MediaFileCache> {
let mut media_file_cache = MediaFileCache::new(
dir.to_str()
.context("Couldn't convert path to str")?
.to_owned(),
);
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
media_file_cache.merge(visit_dirs(&path)?);
/// Refreshes the cache for a collection exclusively
pub fn refresh_collection(&mut self, collection_id: &str) -> Result<()> {
if let Some(c) = self.collections.get_mut(collection_id) {
let collection_path = Path::new(&self.base_path);
let collection_path = collection_path.join(collection_id);
let collection_path = collection_path.as_path();
c.update_media_index(collection_path.join("media").as_path())
.and_then(|_| c.update_media_index(collection_path.join("thumbnails").as_path()))
} else {
media_file_cache
.add_file(&path)
.context("Can't add file to cache!")?;
warn!("Collection id {} not found", collection_id);
Err(anyhow!("Collection id not found"))
}
}
Ok(media_file_cache)
}
......@@ -17,7 +17,7 @@
*/
use crate::media_folder_utils::MediaFileCache;
use anyhow::{Context as AContext, Result};
use anyhow::{anyhow, Context as AContext, Result};
use hyper::http::header;
use hyper::service::Service;
use hyper::{Body, Request, Response, StatusCode};
......@@ -30,20 +30,69 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::Duration;
/// A [`hyper::service::Service`] implementation which wraps the [`MediaFileCache`]
pub struct Svc {
/// [`MediaFileCache`]
pub media_cache: Arc<Mutex<MediaFileCache>>,
/// Path to root directory of media tree
pub base_dir: String,
pub outdated_after: Duration,
/// Duration after which the cache becomes invalid and its refresh is enforced
pub cache_timeout: Duration,
/// Duration after which a collection cache is susceptible to a refresh
pub partial_cache_timeout: Duration,
}
impl Svc {
fn fetch_file(&mut self, id: String, file_type: &str) -> Result<Response<Body>> {
/// Refreshes the overall cache if necessary
fn refresh_cache_if_outdated(&mut self) -> Result<()> {
if let Ok(mut x) = self.media_cache.lock() {
if x.is_outdated(&self.outdated_after) {
if x.is_outdated(&self.cache_timeout) {
info!("Media cache is outdated! Regenerating");
x.refresh().context("Can't refresh cache")?;
}
match x.get_file_path(&id, file_type) {
Ok(())
} else {
Err(anyhow!("Can't lock mutex"))
}
}
/// Refreshes a collection cache if necessary
fn refresh_partial_cache_if_outdated(&mut self, collection_id: &str) -> Result<()> {
if let Ok(mut x) = self.media_cache.lock() {
if x.collection_is_outdated(collection_id, &self.partial_cache_timeout) {
info!("Partial media cache is outdated! Regenerating");
x.refresh_collection(collection_id)
} else {
Ok(())
}
} else {
warn!("Can't lock mutex");
Err(anyhow!("Can't lock mutex"))
}
}
/// Tries to get a file path corresponding to an ID
fn try_fetch_file(&self, id: &str, file_type: &str) -> Result<Option<String>> {
if let Ok(x) = self.media_cache.lock() {
Ok(x.get(id, file_type))
} else {
Err(anyhow!("Can't lock mutex"))
}
}
/// Tries to fetch a
fn fetch_file(
&mut self,
id: String,
file_type: &str,
force_partial_refresh: bool,
) -> Result<Response<Body>> {
if self.refresh_cache_if_outdated().is_err() {
error!("Can't access media cache mutex!");
return internal_server_error();
}
match self.try_fetch_file(&id, file_type) {
Ok(r) => match r {
Some(p) => {
info!("{} file for id {} found", &file_type, &id);
if let Ok(f) = File::open(p) {
......@@ -52,23 +101,23 @@ impl Svc {
reader
.read_to_end(&mut buf)
.context("Can't read media file")?;