aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJakob Stendahl <jakob.stendahl@outlook.com>2022-01-24 14:18:47 +0100
committerJakob Stendahl <jakob.stendahl@outlook.com>2022-01-24 14:18:47 +0100
commit60a3f7bf15c43182dfc5797fce27b603c104db02 (patch)
treed358692a26287632287bc32645bd63db7507ca9b /src
downloadRSS-watcher-60a3f7bf15c43182dfc5797fce27b603c104db02.tar.gz
RSS-watcher-60a3f7bf15c43182dfc5797fce27b603c104db02.zip
Initial commit
Diffstat (limited to 'src')
-rw-r--r--src/database.rs153
-rw-r--r--src/main.rs223
2 files changed, 376 insertions, 0 deletions
diff --git a/src/database.rs b/src/database.rs
new file mode 100644
index 0000000..d43b7d7
--- /dev/null
+++ b/src/database.rs
@@ -0,0 +1,153 @@
+use std::process;
+use std::env;
+use log::{info, warn, error};
+use mysql::*;
+use mysql::prelude::*;
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct FeedConf {
+ pub id: u32,
+ pub url: String,
+ pub last_fetch: Option<i64>,
+ pub title: String,
+ pub message: String,
+ pub push_url: String,
+ pub push_token: String
+}
+
+/**
+ * Create Opts struct from env vars.
+ */
+fn build_opts() -> Opts {
+ let db_host = env::var("DB_HOST").expect("$DB_HOST is not set");
+ let db_base = env::var("DB_BASE").expect("$DB_BASE is not set");
+ let db_user = env::var("DB_USER").expect("$DB_USER is not set");
+ let db_pass = env::var("DB_PASS").expect("$DB_PASS is not set");
+ return Opts::from(OptsBuilder::new()
+ .ip_or_hostname(Some(db_host))
+ .user(Some(db_user))
+ .pass(Some(db_pass))
+ .db_name(Some(db_base)));
+}
+
+pub fn new_conn() -> Conn {
+ let conn_res = Conn::new(build_opts());
+ if let Err(ref x) = conn_res {
+ error!("Could not connect to database ({:#?})...", x);
+ process::exit(1);
+ }
+ return conn_res.unwrap();
+}
+
+/**
+ * Check wether the table `rss-watcher-feeds` exists.
+ */
+fn table_exists(conn: &mut Conn) -> bool {
+ let db_base = env::var("DB_BASE").expect("$DB_BASE is not set");
+ let q = conn.prep("SELECT table_name \
+ FROM INFORMATION_SCHEMA.TABLES \
+ WHERE TABLE_SCHEMA=:schema \
+ AND TABLE_NAME='rss-watcher-feeds'").unwrap();
+ let res: Option<String> = conn.exec_first(
+ &q, params! {"schema" => db_base}).unwrap();
+ if let None = res { return false; }
+ return true;
+}
+
+/**
+ * This will create the `rss-watcher-feeds` table.
+ */
+fn table_create(conn: &mut Conn) {
+ let db_base = env::var("DB_BASE").expect("$DB_BASE is not set");
+ info!("Creating table `{}`.`rss-watcher-feeds`", db_base);
+ let mut tx = conn.start_transaction(TxOpts::default()).unwrap();
+ let mut q = "CREATE TABLE `rss-watcher-feeds` ( \
+ `id` int NOT NULL AUTO_INCREMENT, \
+ `url` VARCHAR(255) NOT NULL, \
+ `last_fetch` int, \
+ `title` VARCHAR(255) NOT NULL DEFAULT '{{title}}', \
+ `message` VARCHAR(255) NOT NULL DEFAULT '{{summary}}', \
+ `push_url` VARCHAR(255) NOT NULL, \
+ `push_token` VARCHAR(255) NOT NULL, \
+ PRIMARY KEY (`id`)
+ )";
+ if let Err(x) = tx.query_drop(q) {
+ error!("Could not create table! ({:#?}", x);
+ process::exit(1);
+ }
+ q = "INSERT INTO `rss-watcher-feeds` (id,
+ url,
+ last_fetch,
+ title,
+ message,
+ push_url,
+ push_token)
+ VALUES (0,'version',1,'','','','')";
+ if let Err(x) = tx.query_drop(q) {
+ error!("Could not insert versioning row! ({:#?}", x);
+ process::exit(1);
+ }
+ if let Err(x) = tx.commit() {
+ error!("Could not create table! ({:#?}", x);
+ process::exit(1);
+ }
+
+}
+
+/**
+ * Bootstrap the database, this will make sure tables exists,
+ * create them if not and run migrations if nececarry.
+ */
+pub fn bootstrap() {
+ info!("Bootstrapping database");
+ let conn_res = Conn::new(build_opts());
+ if let Err(ref x) = conn_res {
+ error!("Could not connect to database ({:#?})...", x);
+ process::exit(1);
+ }
+ let mut conn = conn_res.unwrap();
+ info!("Connected to database");
+
+ if !table_exists(&mut conn) {
+ table_create(&mut conn);
+ }
+
+ info!("Database should now be bootstrapped");
+ info!("We are assuming that the table has the correct columns");
+ info!("If not, we are going to get sql errors");
+}
+
+/**
+ * This will fetch all feeds from the database and return them as a Vector.
+ */
+pub fn get_feeds(conn: &mut Conn) -> Vec<FeedConf> {
+ let q = "SELECT `id`, \
+ `url`, \
+ `last_fetch`, \
+ `title`, \
+ `message`, \
+ `push_url`, \
+ `push_token` \
+ FROM `rss-watcher-feeds` \
+ WHERE id > 1";
+ let res = conn.query_map(q,
+ |(id,url,last_fetch,title,message,push_url,push_token)| {
+ FeedConf{id,url,last_fetch,title,message,push_url,push_token}
+ },).unwrap();
+ return res;
+}
+
+/**
+ * Method that updates the last fetch time timestamp in the database
+ */
+pub fn update_last_fetch(feed_id: u32, last_fetch: i64, conn: &mut Conn) {
+ let mut tx = conn.start_transaction(TxOpts::default()).unwrap();
+ let q = "UPDATE `rss-watcher-feeds` SET last_fetch=? WHERE id=?";
+ if let Err(x) = tx.exec_drop(q, (last_fetch,feed_id,)) {
+ warn!("Could not update last fetch time...! ({:#?}", x);
+ }
+ if let Err(x) = tx.commit() {
+ warn!("Could not commit update! ({:#?}", x);
+ }
+
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..61ac6a6
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,223 @@
+mod database;
+use database::FeedConf;
+
+use std::env;
+use std::process;
+use std::error::Error;
+use feed_rs::parser;
+use feed_rs::model::Feed;
+use feed_rs::model::Entry;
+use feed_rs::model::Text;
+use chrono::prelude::{Utc,DateTime,NaiveDateTime};
+use std::time::Duration;
+use tokio::{time};
+use log::{info, warn, error};
+use html2md;
+extern crate mime;
+
+/**
+ * Extract text field from Option
+ */
+fn extract_text(text: &Option<Text>) -> String {
+ if text.is_none() { return String::from("Text field not found"); }
+ let field = text.as_ref().unwrap();
+ match (field.content_type.type_(), field.content_type.subtype()) {
+ (mime::TEXT, mime::HTML) => return html2md::parse_html(field.content.as_ref()),
+ (mime::TEXT, mime::PLAIN) => return field.content.to_owned(),
+ _ => return String::from(format!("Unknown field content type {:#?}", field.content_type)),
+ }
+}
+
+/**
+ * This will extract fields from RSS entry, and replace special tags
+ * from the input string with those entries.
+ */
+fn replace_tags(input: String, entry: &Entry) -> String {
+ let mut out = input;
+ out = out.replace("{{id}}", entry.id.as_ref());
+ out = out.replace("{{title}}", extract_text(&entry.title).as_ref());
+ out = out.replace("{{summary}}", extract_text(&entry.summary).as_ref());
+ return out;
+}
+
+/**
+ * Method that escapes some characters that would break json spec
+ */
+fn escape(input: String) -> String {
+ return input.replace("\\","\\\\")
+ .replace("\"", "\\\"")
+ .replace("\n", "\\n");
+}
+
+/**
+ * Push feed entry to gotify
+ */
+async fn gotify_push(entry: &Entry, feed_conf: &FeedConf) -> Result<(), reqwest::Error> {
+ let uri = format!("{}/message", &feed_conf.push_url);
+
+ // Extract content and create title and message strings
+ let mut title_content = feed_conf.title.to_owned();
+ let mut message_content = feed_conf.message.to_owned();
+ title_content = replace_tags(title_content, entry);
+ message_content = replace_tags(message_content, entry);
+
+ // Build json string that will be sent as payload to gotify
+ let mut req = "{".to_owned();
+
+ req.push_str(format!("\"title\":\"{}\"", escape(title_content.to_owned())).as_str());
+ req.push_str(format!(",\"message\":\"{}\"", escape(message_content.to_owned())).as_str());
+ req.push_str(",\"priority\":1");
+
+ req.push_str(",\"extras\": {");
+ req.push_str("\"client::display\": { \"contentType\": \"text/markdown\" }");
+ if entry.links.len() > 0 {
+ req.push_str(",\"client::notification\": { \"click\": { \"url\": \"");
+ req.push_str(escape(entry.links[0].href.to_owned()).as_str());
+ req.push_str("\"}}")
+ }
+ req.push_str("}}");
+
+ // Send request to gotify
+ let client = reqwest::Client::new();
+ let res = client.post(uri)
+ .query(&[("token",&feed_conf.push_token)])
+ .body(req.to_owned())
+ .header("Content-Type", "application/json")
+ .send()
+ .await?;
+ if res.status().is_success() {
+ info!("Sent notification with title \"{}\"", title_content);
+ } else {
+ error!("payload: {}", req);
+ error!("Could not send notification... {:#?}", res);
+ }
+ Ok(())
+}
+
+/**
+ * Function takes a FeedConf struct, and makes a get request to fetch
+ * the feed. It then uses feed_rs to parse that feed and returns that
+ * parsed feed.
+ */
+async fn fetch_feed(feed_conf: &FeedConf, last_fetch_time: DateTime<Utc>) -> Result<Option<Feed>, Box<dyn Error>> {
+ info!("Fetching feed \"{}\"", &feed_conf.url);
+ let client = reqwest::Client::new();
+ let last_fetch_rfc2822 = last_fetch_time.to_rfc2822().replace("+0000", "GMT");
+ let resp = client.get(&feed_conf.url)
+ .header("If-Modified-Since", &last_fetch_rfc2822)
+ .send()
+ .await?;
+ println!("{:#?}", resp);
+ if resp.status() == 304 {
+ info!("No changes since last fetch {}", &last_fetch_rfc2822);
+ Ok(None)
+ } else {
+ let feed = parser::parse(&resp.bytes().await?[..])?;
+ Ok(Some(feed))
+ }
+}
+
+/**
+ * This calls fetch_feed, and figures out wether it succeeded or not.
+ * It then pushes all _new_ entries to gotify, and returns the last fetched
+ * time (now, or current if there is no new articles).
+ */
+async fn get_feed(feed_conf: &FeedConf) -> bool {
+ // Check wether last_fetch_time is set, if it is not, we will use the "now"
+ // time as that. Which means that no articles will be found.
+ let last_fetch_time;
+ match &feed_conf.last_fetch {
+ Some(x) => {
+ last_fetch_time = DateTime::from_utc(
+ NaiveDateTime::from_timestamp(x.to_owned(),0), Utc);
+ error!("{:#?}", last_fetch_time);
+ },
+ None => last_fetch_time = Utc::now(),
+ }
+
+ // Fetch the feed and parse it
+ let res = fetch_feed(&feed_conf, last_fetch_time).await;
+ let feed: Option<Feed>;
+ match res {
+ Err(e) => {
+ error!("Could not fetch feed ({:?})", e);
+ return false;
+ },
+ Ok(x) => feed = x
+ }
+
+ // If feed is empty (we got status code 304), we should skip any further
+ // processing
+ if let None = feed { return false; }
+
+ // Process all entries in the feed
+ for entry in feed.unwrap().entries {
+ // Skip sending notification if the publish time is before the
+ // last_fetch_time
+ if let Some(x) = entry.published {
+ if last_fetch_time > x {
+ info!("Skipping entry that was published at {}", x);
+ continue;
+ }
+ }
+ // Attempt to send notification, give up feed for this main loop
+ // iteration without saving last_fetch_time
+ if let Err(e) = gotify_push(&entry, &feed_conf).await {
+ error!("Could not send push notification ({:#?})", e);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+/**
+ * This gets all feeds from the database and fetches them once.
+ */
+async fn main_loop() {
+ let mut conn = database::new_conn();
+ for feed in database::get_feeds(&mut conn) {
+ let time_now = Utc::now();
+ let res = get_feed(&feed).await;
+ if res {
+ database::update_last_fetch(feed.id, time_now.timestamp(), &mut conn);
+ }
+ }
+}
+
+/**
+ * Main app, sets up database, and then it keeps an active loop.
+ */
+async fn app() {
+ database::bootstrap();
+
+ let interval_timeout;
+ match env::var("FETCH_INTERVAL") {
+ Ok(val) => {
+ let res = val.parse::<u64>();
+ if let Err(_e) = res {
+ error!("Invalid $FETCH_INTERVAL value {:#?}", val);
+ process::exit(1);
+ }
+ interval_timeout = res.unwrap();
+ },
+ Err(_e) => {
+ warn!("$FETCH_INTERVAL not set, using default of 2m");
+ interval_timeout = 120000;
+ },
+ }
+
+ let mut interval = time::interval(Duration::from_millis(interval_timeout));
+ loop {
+ main_loop().await;
+ interval.tick().await;
+ }
+}
+
+fn main() {
+ env_logger::init();
+ info!("Starting rss-watcher");
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ let future = app();
+ rt.block_on(future);
+}