cgn-appointments/src/cgnappointments/__main__.py

409 lines
12 KiB
Python

import argparse
import csv
import json
import logging
import re
from datetime import datetime
from pathlib import Path
from time import sleep
import requests
import yaml
from platformdirs import user_config_dir, user_data_dir
from requests import HTTPError
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from validators import url as validate_url
from .logger import setup_logging
PROJECT_ROOT = Path(__file__).parent.parent
logger = logging.getLogger(__package__)
def parse_arguments() -> dict:
"""
Parse the arguments.
:return: dict
"""
argparser = argparse.ArgumentParser(
prog="cgn-appointments",
description="Scrapes appointments from termine.stadt-koeln.de an sends a message to a ntfy server.",
)
argparser.add_argument(
"-s",
"--services",
action="store",
nargs='+',
type=str,
help="Services to check",
required=False,
)
argparser.add_argument(
"-l",
"--locations",
action="store",
nargs='+',
type=str,
help="Locations to check",
required=False,
)
argparser.add_argument(
"--config-file",
action="store",
type=Path,
help="Path to the configuration file",
required=False,
)
argparser.add_argument(
"--csv-file",
action="store",
type=Path,
help="Path to the csv file, which stores the last fetched dates",
required=False,
)
argparser.add_argument(
"--log-file",
action="store",
type=Path,
help="Path to logfile",
required=False,
)
argparser.add_argument(
"--log-level",
action="store",
type=str,
choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"],
help="Logging Level",
required=False,
)
return argparser.parse_args().__dict__
def update_config_with_args(config: dict, args: dict) -> dict:
"""
Update the configuration with the arguments.
:param config:
:param args:
:return: dict
"""
update_config = {
"services": args.get("services"),
"locations": args.get("locations"),
"csv_path": args.get("csv_file"),
}
for key, value in update_config.items():
if value is not None:
config[key] = value
if args.get("log_file") is not None:
config["logging"]["handlers"]["file"]["filename"] = args.get("log_file")
if args.get("log_level") is not None:
config["logging"]["loggers"]["root"]["level"] = args.get("log_level")
return config
def get_config() -> dict:
"""
Get the configuration from the config.yaml file.
:return: dict
"""
args = parse_arguments()
if args.get("config_file") is not None:
config_yaml = args.get("config_file")
else:
config_yaml = Path(user_config_dir()) / "cgn-appointments" / "config.yaml"
if not config_yaml.exists():
print(f"""config.yaml not found.
Creating a new one under '{config_yaml}'.
Please fill in the required information.""")
config_yaml.parent.mkdir(parents=True, exist_ok=True)
config_yaml.touch()
config_yaml.write_text(Path(Path(__file__).parent, "config_template.yaml").read_text())
exit(0)
try:
with open(config_yaml, "r") as file:
config = dict(yaml.safe_load(file))
except FileNotFoundError:
print(f"config.yaml not found in '{config_yaml}'.")
exit(1)
# Replace config values with given arguments
return update_config_with_args(config, args)
def define_csv_path(csv_path: str|None, csv_name: str|None) -> Path:
"""
Define the path to the csv file.
:param csv_path:
:param csv_name:
:return: Path
"""
csv_path = Path(csv_path) if csv_path else None
csv_name = Path(csv_name) if csv_name else None
if csv_path is not None and csv_path.is_dir() and csv_name is not None:
return csv_path / csv_name
elif csv_path is not None and csv_path.is_dir() and csv_name is None:
return csv_path / "cgn-appointments.csv"
elif csv_path is not None:
csv_path.touch()
return csv_path
elif csv_name is not None:
csv_path = Path(user_data_dir()) / csv_name
csv_path.touch()
return csv_path
else:
csv_path = Path(user_data_dir()) / "cgn-appointments"
csv_path.touch()
return csv_path / "cgn-appointments.csv"
def select_options(services, selects):
"""
Selects the first option of dropdown elements.
:param services:
:param selects:
:return:
"""
for select in selects:
parents = select.find_element(By.XPATH, "../..").text
available_services = [service for service in services if service in parents]
if available_services:
logger.debug(f"Selecting option for {', '.join(available_services)}")
for attr in select.get_property('attributes'):
if attr['name'] == 'data-testid' and attr['value'].startswith("service--"):
Select(select).select_by_index(1)
def ntfy(server: str,
topic: str,
title: str,
message: str,
click_url: str,
tags: list = None,
priority: int = 3) -> None:
"""
Sends a notification to the ntfy server.
:param title:
:param click_url:
:param topic:
:param server:
:param message:
:param tags:
:param priority:
:return:
"""
data = {
"topic": topic,
"title": title,
"message": message,
"tags": tags,
"click": click_url,
"priority": priority
}
logger.debug(f"Sending notification to '{server}'.",
extra={"ntfy": {"server": server} | data})
try:
response = requests.post(
server,
data=json.dumps(data)
)
response.raise_for_status()
except HTTPError as e:
logger.error(f"HTTP error occurred: {e}",
extra={"ntfy": {"server": server} | data })
def write_csv(csv_path, lines):
"""
Writes the lines to a csv file.
:param csv_path:
:param lines:
:return:
"""
logger.debug(f"Writing to csv file.",
extra={"csv_path": csv_path, "lines": lines})
try:
with open(csv_path, mode="w") as file:
csv_writer = csv.writer(file)
for line in lines:
csv_writer.writerow(line)
except Exception as e:
logger.error(f"Error writing to csv file: {e}",
exc_info=True,
extra={"csv_path": csv_path, "lines": lines})
def main():
"""
Main function.
:return:
"""
# Get the configuration
config = get_config()
# Set up logging
setup_logging(config.get("logging"))
# Set the variables
url = config.get("url")
services = config.get("services")
check_locations = config.get("locations")
csv_name = config.get("csv_name", "cgn-appointments.csv")
csv_path = define_csv_path(config.get("csv_path"), csv_name)
date_regex = config.get("date_regex")
date_format = config.get("date_format")
ntfy_server = config.get("ntfy").get("server")
ntfy_topic = config.get("ntfy").get("topic")
ntfy_title = config.get("ntfy").get("title")
ntfy_message = config.get("ntfy").get("message")
ntfy_tags = config.get("ntfy").get("tags")
ntfy_priority = config.get("ntfy").get("priority")
# Validate data
if not validate_url(url):
logger.error(f"Invalid URL '{url}'.")
exit(1)
if not services:
logger.error("No services defined.")
exit(1)
if not check_locations:
logger.error("No locations defined.")
exit(1)
if not date_regex:
logger.error("No date regex defined.")
exit(1)
try:
re.compile(date_regex)
except re.error:
logger.error(f"Invalid date regex '{date_regex}'.")
exit(1)
if not date_format:
logger.error("No date format defined.")
exit(1)
if not validate_url(ntfy_server):
logger.error(f"Invalid ntfy server '{ntfy_server}'.")
exit(1)
if not ntfy_topic:
logger.error("No ntfy topic defined.")
exit(1)
locations = {}
options = Options()
options.add_argument("--headless=new")
driver = webdriver.Chrome(options=options)
# Open the website
logger.debug(f"Opening website: {url}")
driver.get(url)
# Start a new session if the previous one expired
session_expired = driver.find_element(By.ID, "page_title").text == "Ihre Sitzung ist abgelaufen."
if session_expired:
logger.debug("Session expired. Starting a new session.")
new_session_button = driver.find_element(By.CLASS_NAME, "button")
new_session_button.click()
# Select services
selects = driver.find_elements(By.TAG_NAME, 'select')
select_options(services, selects)
# Click next button
next_button = driver.find_element(By.TAG_NAME, "button")
next_button.click()
# Wait for the page to load
logger.debug("Waiting 10 seconds for the page to load...")
sleep(10)
# Get location containers
location_containers = driver.find_elements(By.CLASS_NAME, "location-container")
for loc in check_locations:
for location_container in location_containers:
loc_title = location_container.find_element(By.CLASS_NAME, "location_title")
if loc in loc_title.text:
locations.update({loc: {"location_container": location_container}})
if len(locations) > 0:
logger.debug(f"Location containers found",
extra={"locations": locations})
else:
logger.warning("No location containers found.")
# Get earliest date for each location
for loc in locations.keys():
location_container = locations[loc]["location_container"]
date_text = location_container.find_element(By.CLASS_NAME, "earliest").text
found_date = re.search(date_regex, date_text)
if found_date:
locations[loc]["earliest"] = datetime.strptime(found_date.group(), date_format)
elif "kein Termin gefunden" in date_text:
locations[loc]["earliest"] = None
logger.info(f"No appointment found for {loc}.")
else:
logger.warning(f"Could not find date or 'kein Termin gefunden' for {loc}.",
extra={"location": loc, "date_text": date_text})
# Store session URL and close browser
session_url = driver.current_url
driver.quit()
logger.debug("Browser closed.")
# Read previous data from file
csv_path.touch()
with open(csv_path, mode="r") as file:
csv_file = list(csv.reader(file))
logger.debug(f"Read csv file",
extra={"csv_path": csv_path, "csv_file": csv_file})
# Append previous data to locations
if len(csv_file) > 0:
for n, line in enumerate(csv_file):
if len(line) == 2 and line[0] in check_locations:
locations[line[0]]["previous"] = datetime.strptime(line[1], date_format)
elif len(line) != 2:
logger.debug("Invalid line in csv file.",
extra={"line_number": n, "line": line})
exit(1)
# Compare previous and new dates and send notification if new date is different
lines = []
for name, data in locations.items():
previous_date = data.get("previous")
new_date = data.get("earliest")
if new_date and new_date != previous_date:
logger.info(f"New appointment found for {name}: {new_date}",
extra={"location": name, "previous_date": previous_date,
"new_date": new_date})
lines.append((name, new_date.strftime(date_format)))
ntfy(
ntfy_server,
ntfy_topic,
ntfy_title,
ntfy_message % (name, new_date),
session_url,
ntfy_tags,
ntfy_priority,
)
elif previous_date is not None:
lines.append((name, previous_date.strftime(date_format)))
# Write new data to file
write_csv(csv_path, lines)
if __name__ == "__main__":
try:
main()
except Exception as e:
logger.error(f"An error occurred: {e}", exc_info=True)
exit(1)