770 lines
30 KiB
Python
770 lines
30 KiB
Python
#! /app/.venv/bin/python
|
|
|
|
import email.utils
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import smtplib
|
|
import ssl
|
|
from datetime import datetime, date, timedelta
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from email.utils import format_datetime
|
|
|
|
import caldav
|
|
import click
|
|
import pytz
|
|
from caldav import CalendarObjectResource, Principal, Calendar, DAVObject
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from jinja2 import Template
|
|
|
|
from booking import PriorityEventToken, Calendar as RoomBookingCalendar
|
|
from mylogger.logger import setup_logging
|
|
|
|
tz = pytz.timezone(os.getenv("TIME_ZONE", "Europe/Berlin"))
|
|
MAX_SEARCH_HORIZON = int(os.getenv("MAX_SEARCH_HORIZON", 14))
|
|
|
|
FILE_LOG_LEVEL = os.getenv("FILE_LOG_LEVEL", "INFO")
|
|
STDOUT_LOG_LEVEL = os.getenv("STDOUT_LOG_LEVEL", "DEBUG")
|
|
LOG_DIR = os.getenv("LOG_DIR", "/data/room-booking.log.jsonl")
|
|
LOGROTATE_BACKUP_COUNT = int(os.getenv("LOGROTATE_BACKUP_COUNT", 6))
|
|
LOGROTATE_MAX_BYTES = int(os.getenv("LOGROTATE_MAX_BYTES", 10 * 1024 * 1024))
|
|
|
|
|
|
class DavEvent:
|
|
"""
|
|
Wrapper for calendar events fetched from a CalDAV server.
|
|
"""
|
|
uid: str
|
|
name: str
|
|
start: datetime
|
|
end: datetime
|
|
duration: timedelta
|
|
created: datetime
|
|
status: str
|
|
organizer: str
|
|
calendar: DAVObject
|
|
obj: CalendarObjectResource
|
|
missing_required_fields: list
|
|
|
|
def __init__(self, event: CalendarObjectResource):
|
|
"""
|
|
Initialize a DavEvent object.
|
|
:param event: CalendarObjectResource object representing the event.
|
|
"""
|
|
for component in event.icalendar_instance.walk():
|
|
if component.name == "VEVENT":
|
|
|
|
# Check if the event has all required fields
|
|
required = {
|
|
"uid": component.get("uid", False),
|
|
"dtstart": component.get("dtstart", False),
|
|
"dtend": component.get("dtend", False),
|
|
"dtstamp": component.get("dtstamp", False),
|
|
}
|
|
if not all(required.values()):
|
|
self.missing_required_fields = [k for k, v in
|
|
required.items() if
|
|
v is False]
|
|
else:
|
|
self.missing_required_fields = []
|
|
|
|
self.uid = event.icalendar_component.get("uid")
|
|
self.name = event.icalendar_component.get("summary", "No Name")
|
|
self.start = self._handle_date(
|
|
event.icalendar_component.get("dtstart").dt)
|
|
self.end = self._handle_date(
|
|
event.icalendar_component.get("dtend").dt)
|
|
self.duration = self.end - self.start
|
|
self.created = self._handle_date(
|
|
event.icalendar_component.get("dtstamp").dt)
|
|
self.status = event.icalendar_component.get("status",
|
|
"CONFIRMED")
|
|
self.organizer = event.icalendar_component.get("organizer",
|
|
"").replace(
|
|
"mailto:", "").strip()
|
|
self.calendar = event.parent
|
|
self.obj = event
|
|
self.is_deprioritized = False
|
|
self.is_deleted = False
|
|
|
|
def __hash__(self):
|
|
return hash(self.extended_uid)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, DavEvent):
|
|
return NotImplemented
|
|
return self.extended_uid == other.extended_uid
|
|
|
|
@property
|
|
def extended_uid(self):
|
|
"""
|
|
Create a more unique identifier for the event.
|
|
This is necessary to distinguish between events of the same series, which
|
|
have the same UID but different start and end times.
|
|
:return: Unique identifier string
|
|
"""
|
|
# Fallback to calendar id if UID is not set
|
|
uid = self.uid if self.uid else self.calendar.id
|
|
return f"{uid}_{int(self.start.timestamp())}_{int(self.end.timestamp())}"
|
|
|
|
@property
|
|
def is_recurring(self) -> bool:
|
|
"""
|
|
Check if the event is a recurring event.
|
|
:return: True if the event is recurring, False otherwise.
|
|
"""
|
|
return 'RECURRENCE-ID' in self.obj.icalendar_component
|
|
|
|
@property
|
|
def is_cancelled(self) -> bool:
|
|
"""
|
|
Check if the event is a cancelled event.
|
|
:return: True if the event is cancelled, False otherwise.
|
|
"""
|
|
return self.obj.icalendar_component.get("status") == "CANCELLED"
|
|
|
|
@property
|
|
def is_prioritized(self) -> bool:
|
|
"""
|
|
Check if the event is a prioritized event by finding and validating the
|
|
priority token.
|
|
:return:
|
|
"""
|
|
description = self.obj.icalendar_component.get("DESCRIPTION", "")
|
|
uid = self.uid
|
|
|
|
# Check if there is a token in the event description
|
|
token_pattern = r"\#[A-Z0-9]{12}\#"
|
|
match = re.search(token_pattern, str(description))
|
|
if match:
|
|
token = match.group()
|
|
print(f"Priority token found in event description: {token}")
|
|
try:
|
|
token_obj = PriorityEventToken.objects.get(token=token)
|
|
except ObjectDoesNotExist:
|
|
print(f"Priority token could not be found in database: {token}")
|
|
return False
|
|
|
|
# If token is already used, verify signature
|
|
if token_obj.is_used:
|
|
# Check if the signature is valid
|
|
|
|
if token_obj.validate_signature(uid):
|
|
print(f"Priority token '{token}' is valid.")
|
|
return True
|
|
else:
|
|
print(f"Priority token '{token}' is invalid.")
|
|
# TODO: Notify about invalid token usage
|
|
return False
|
|
|
|
# If the token hasn't been used yet, redeem it
|
|
else:
|
|
print(f"Redeeming priority token '{token}' for event '{uid}'.")
|
|
token_obj.redeem(uid)
|
|
# TODO: Notify about token redemption
|
|
return True
|
|
|
|
# If no token is found, return False
|
|
return False
|
|
|
|
def delete(self):
|
|
"""
|
|
Delete the event from the calendar.
|
|
:return:
|
|
"""
|
|
self.is_deleted = True
|
|
self.obj.delete()
|
|
|
|
def cancel(self):
|
|
"""
|
|
Cancel the event.
|
|
:return:
|
|
"""
|
|
self.obj.icalendar_component["status"] = "CANCELLED"
|
|
self.obj.save(no_create=True, increase_seqno=False)
|
|
|
|
def dumps(self, datetime_format="%Y-%m-%d %X") -> dict:
|
|
"""
|
|
Dump a dictionary with the event data.
|
|
:param datetime_format: Format for datetime fields
|
|
:return: string with json data
|
|
"""
|
|
return {
|
|
"uid": self.uid,
|
|
"name": self.name,
|
|
"start": self.start.strftime(datetime_format),
|
|
"end": self.end.strftime(datetime_format),
|
|
"duration": self.duration.total_seconds(),
|
|
"created": self.created.strftime(datetime_format),
|
|
"status": self.status,
|
|
"organizer": self.organizer,
|
|
"calendar_id": self.calendar.id,
|
|
"calendar_name": self.calendar.name,
|
|
"is_cancelled": self.is_cancelled,
|
|
"is_recurring": self.is_recurring,
|
|
"is_prioritized": self.is_prioritized,
|
|
"missing_required_fields": self.missing_required_fields,
|
|
}
|
|
|
|
@staticmethod
|
|
def _handle_date(date_value: datetime | date) -> datetime:
|
|
"""
|
|
Date objects will be converted to datetime objects in the specified timezone.
|
|
"""
|
|
if isinstance(date_value, datetime):
|
|
date_value = date_value.astimezone(tz)
|
|
else:
|
|
date_value = tz.localize(
|
|
datetime.combine(date_value, datetime.min.time()))
|
|
|
|
return date_value
|
|
|
|
|
|
@click.command()
|
|
@click.option("--calendars", "-c", multiple=True, required=False,
|
|
help="Calendars to check for collisions (can be specified multiple times).")
|
|
@click.option("--dry-run", is_flag=True,
|
|
help="Run in test mode (do not delete events, just print what would be deleted).",
|
|
default=False)
|
|
def clear_bookings(calendars: list, dry_run: bool = False):
|
|
"""
|
|
Clear colliding and cancelled events in calendars.
|
|
:param calendars: List of calendars to check for collisions.
|
|
:param dry_run: Do not delete events, just print what would be deleted.
|
|
"""
|
|
|
|
logger = logging.getLogger(__name__)
|
|
setup_logging(file_log_level=FILE_LOG_LEVEL,
|
|
stdout_log_level=STDOUT_LOG_LEVEL,
|
|
logdir=LOG_DIR)
|
|
|
|
logger.debug(f"Clear bookings.{' Test mode enabled.' if dry_run else ''}")
|
|
# Fetch calendar objects from database
|
|
if len(calendars) > 0:
|
|
calendars_found = list(
|
|
RoomBookingCalendar.objects.filter(name__in=calendars))
|
|
for cal in calendars:
|
|
if not cal in [c.name for c in calendars_found]:
|
|
logger.debug(f"Calendar '{cal}' not found in database. Skipping.")
|
|
else:
|
|
calendars_found = list(RoomBookingCalendar.objects.all())
|
|
|
|
if not calendars_found:
|
|
logger.debug("No valid calendars found. Exiting.")
|
|
exit(0)
|
|
|
|
calendars = calendars_found
|
|
|
|
dav_client = caldav.DAVClient(
|
|
url=calendars[0].url,
|
|
username=calendars[0].username,
|
|
password=calendars[0].calendar_password,
|
|
)
|
|
|
|
principal = dav_client.principal()
|
|
|
|
# Filter calendars to only those that are in the calendars list
|
|
# Note: The id of the objects returned by principal.calendars() corresponds
|
|
# to the name f the Django Calendar model instances.
|
|
tcal_by_name = {c.name: c for c in calendars}
|
|
calendars_filtered = [cal for cal in principal.calendars()
|
|
if cal.id in tcal_by_name.keys()]
|
|
|
|
if not calendars_filtered:
|
|
logger.debug("No calendars to clear collisions in. Exiting.")
|
|
exit(0)
|
|
|
|
for calendar in calendars_filtered:
|
|
|
|
# Get events from calendar
|
|
logger.debug(
|
|
f"Clearing cancelled bookings and collisions in calendar: {calendar.id}")
|
|
horizon = tcal_by_name[calendar.id].auto_clear_collision_horizon_days
|
|
|
|
# Split horizon search into multiple requests if horizon is bigger
|
|
# than MAX_SEARCH_HORIZON
|
|
horizons = []
|
|
while horizon > 0:
|
|
if horizon >= MAX_SEARCH_HORIZON:
|
|
horizons.append(MAX_SEARCH_HORIZON)
|
|
else:
|
|
horizons.append(horizon)
|
|
horizon -= MAX_SEARCH_HORIZON
|
|
|
|
events_fetched = []
|
|
start_delta = 0
|
|
end_delta = 0
|
|
today = datetime.now(tz=tz).date()
|
|
for h in horizons:
|
|
end_delta += h
|
|
try:
|
|
events_fetched.extend(calendar.search(
|
|
start=today + timedelta(days=start_delta),
|
|
end=today + timedelta(days=end_delta),
|
|
event=True,
|
|
expand=True,
|
|
split_expanded=True,
|
|
))
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Failed to fetch events for calendar: {calendar.id}: {e}",
|
|
extra={
|
|
"calendar": calendar.id,
|
|
"max_search_horizon": MAX_SEARCH_HORIZON,
|
|
"username": calendars[0].username,
|
|
})
|
|
start_delta += h
|
|
|
|
# Create DavEvent objects from fetched events
|
|
events = []
|
|
for event in events_fetched:
|
|
try:
|
|
events.append(DavEvent(event))
|
|
except ValueError as e:
|
|
logger.exception(f"Could not create DavEvent object.",
|
|
exc_info=e, extra={
|
|
"event": str(event)
|
|
})
|
|
continue
|
|
|
|
# Filter out events that are missing required fields
|
|
events_missing_required_fields = [
|
|
event for event in events if event.missing_required_fields
|
|
]
|
|
for event in events_missing_required_fields:
|
|
event_result = {
|
|
"uid": event.uid or "No UID",
|
|
"name": event.name or "No Name",
|
|
"reason": f"Missing required fields: {', '.join(event.missing_required_fields)}",
|
|
"calendar": calendar.id,
|
|
}
|
|
logger.warning("Skipping event because missing required fields",
|
|
extra=event_result)
|
|
continue
|
|
events = [event for event in events if
|
|
not event.missing_required_fields]
|
|
|
|
# Delete cancelled non-recurring events if not in dry_run mode
|
|
for event in events:
|
|
if event.is_cancelled and not event.is_recurring:
|
|
if not dry_run:
|
|
event.obj.delete()
|
|
event_result = {
|
|
"uid": event.uid,
|
|
"name": event.name or "No Name",
|
|
"calendar": event.calendar.id,
|
|
}
|
|
logger.info("Deleted cancelled event.", extra=event_result)
|
|
|
|
# Find collisions
|
|
collisions = find_collisions(events)
|
|
logger.info(
|
|
f"Found {len(collisions)} event collisions.", extra={
|
|
"calendar": calendar.id,
|
|
"collisions": [{"event": ov["event"].dumps(),
|
|
"collides_with": [cw.dumps() for cw in
|
|
ov['collides_with']]} for ov
|
|
in collisions.values()],
|
|
})
|
|
|
|
# Delete colliding events and send emails to organizers
|
|
for collision in collisions.values():
|
|
event = collision["event"]
|
|
collides_with = collision["collides_with"]
|
|
event.is_deprioritized = any(
|
|
ov.is_prioritized for ov in collides_with)
|
|
collision_result = {
|
|
"event": event.dumps(),
|
|
"collides_with": [ov.dumps() for ov in collides_with],
|
|
"calendar": calendar.id,
|
|
}
|
|
try:
|
|
# If this is not a recurring event, we can delete it directly
|
|
if not event.is_recurring:
|
|
# but only if we are not in dry_run mode
|
|
if not dry_run:
|
|
event.delete()
|
|
collision_result["cancellation_type"] = "deleted"
|
|
|
|
# If this is a recurring event, and not already cancelled,
|
|
# we need to cancel it now
|
|
elif not event.is_cancelled:
|
|
if not dry_run:
|
|
event.cancel()
|
|
collision_result["cancellation_type"] = "cancelled"
|
|
|
|
# Get all events of the affected calendar for the days of the
|
|
# declined event
|
|
other_bookings = find_other_bookings(event, calendar)
|
|
|
|
# Find free alternative rooms for the time slot of the declined event
|
|
alternatives = find_alternatives(event, principal, calendars)
|
|
|
|
# Create a set for all events as an overview
|
|
event.is_declined = True
|
|
event.is_collision = False
|
|
event.is_other_booking = False
|
|
overview = {event}
|
|
|
|
# Add the colliding events to the overview
|
|
for cw in collides_with:
|
|
cw.is_declined = False
|
|
cw.is_collision = True
|
|
cw.is_other_booking = False
|
|
overview.add(cw)
|
|
|
|
# Add the other bookings to the overview
|
|
for ob in other_bookings:
|
|
if not ob.is_cancelled:
|
|
ob.is_declined = False
|
|
ob.is_collision = False
|
|
ob.is_other_booking = True
|
|
overview.add(ob)
|
|
|
|
# Make a list from the set to ensure order
|
|
overview = list(overview)
|
|
|
|
# Sort the overview by start time
|
|
overview.sort(key=lambda ev: ev.start)
|
|
|
|
# Send email to organizer of the event if not in dry_run mode
|
|
email_template = tcal_by_name[calendar.id].email_template
|
|
if email_template:
|
|
try:
|
|
send_mail_to_organizer(
|
|
booking=event,
|
|
colliding_bookings=collides_with,
|
|
event_overview=overview,
|
|
other_bookings=other_bookings,
|
|
alternatives=alternatives,
|
|
calendar=calendar,
|
|
email_template=email_template,
|
|
is_test=dry_run,
|
|
)
|
|
collision_result["email_sent"] = True
|
|
except Exception as e:
|
|
logger.exception(
|
|
"Failed to send email to organizer"
|
|
f" for event '{event.name}'",
|
|
exc_info=e,
|
|
extra={
|
|
"calendar": calendar.id,
|
|
"event": event.dumps(),
|
|
"colliding_with": [cw.dumps() for cw in
|
|
collides_with],
|
|
"overview": [ov.dumps() for ov in overview],
|
|
"alternatives": [cal.id for cal in alternatives],
|
|
"email_template": email_template.id,
|
|
})
|
|
collision_result["email_sent"] = False
|
|
else:
|
|
logger.debug(
|
|
f"No email template found for calendar '{calendar.id}'. "
|
|
"Skipping email sending.",
|
|
extra={"calendar": calendar.id}
|
|
)
|
|
|
|
collision_result["email_sent"] = False
|
|
|
|
# Log results
|
|
logger.info("Processed colliding event.",
|
|
extra=collision_result)
|
|
|
|
except Exception as e:
|
|
logger.exception(
|
|
f"Failed to process colliding event '{event.uid}'.",
|
|
exc_info=e, extra={
|
|
"calendar": calendar.id,
|
|
"event": event.dumps(),
|
|
"collides_with": [ov.dumps() for ov in collides_with],
|
|
"dry_run": dry_run,
|
|
})
|
|
print("------")
|
|
continue
|
|
|
|
logger.debug(f"Clearing completed.{' Test mode enabled.' if dry_run else ''}")
|
|
|
|
|
|
def find_collisions(events: list[DavEvent]) -> dict:
|
|
"""
|
|
Find colliding events.
|
|
:param events: List of events to check for collisions.
|
|
:return: Dictionary of colliding events with their UIDs as keys and
|
|
a dictionary containing the event and the event it collides with
|
|
as values.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
colliding_events = {}
|
|
|
|
# Order events by created time
|
|
events = sorted(events, key=lambda item: item.created)
|
|
|
|
# Define lambda functions to check for collisions
|
|
event_starts_later = lambda e1, e2: e1.end >= e2.end > e1.start
|
|
event_starts_earlier = lambda e1, e2: e1.end > e2.start >= e1.start
|
|
event_inbetween = lambda e1, e2: e2.start < e1.start < e1.end < e2.end
|
|
|
|
# Find colliding events
|
|
for event in events:
|
|
|
|
# Skip if the event is already in the colliding events dictionary
|
|
# or if it is already canceled
|
|
if colliding_events.get(event.extended_uid) or event.is_cancelled:
|
|
continue
|
|
|
|
for compare_event in events:
|
|
# Skip if the compare_event is same event or belongs to the same
|
|
# series as the compare event
|
|
if event.uid == compare_event.uid:
|
|
continue
|
|
|
|
# Skip if the compare_event is already canceled
|
|
if compare_event.is_cancelled:
|
|
continue
|
|
|
|
# Check if the events collide
|
|
if event_starts_later(event, compare_event) or \
|
|
event_starts_earlier(event, compare_event) or \
|
|
event_inbetween(event, compare_event):
|
|
|
|
# If both events are prioritized, skip the collision
|
|
if compare_event.is_prioritized and event.is_prioritized:
|
|
logger.info("Skipping collisions detection between "
|
|
"two prioritized events "
|
|
f"'{event.name}' and '{compare_event.name}'",
|
|
extra={"event_1": event.dumps(),
|
|
"event_2": compare_event.dumps()})
|
|
continue
|
|
|
|
# If the compare_event is prioritized and the event is not,
|
|
# skip the collision
|
|
elif compare_event.is_prioritized and not event.is_prioritized:
|
|
logger.debug("Skipping collisions detection between "
|
|
f"event '{event.name}' and prioritized "
|
|
f"event '{compare_event.name}'",
|
|
extra={"event_1": event.dumps(),
|
|
"event_2": compare_event.dumps()})
|
|
continue
|
|
|
|
# Add to colliding events dictionary
|
|
if e := colliding_events.get(compare_event.extended_uid):
|
|
# If the event is already in the dictionary, extend the collisions
|
|
e["collides_with"].append(event)
|
|
else:
|
|
# Create a new entry for the colliding event
|
|
colliding_events[compare_event.extended_uid] = {
|
|
"event": compare_event,
|
|
"collides_with": [event]
|
|
}
|
|
|
|
return colliding_events
|
|
|
|
|
|
def send_mail_to_organizer(
|
|
booking: DavEvent,
|
|
colliding_bookings: list[DavEvent],
|
|
event_overview: list[DavEvent],
|
|
calendar: caldav.Calendar,
|
|
email_template,
|
|
other_bookings: list[DavEvent] = None,
|
|
alternatives: list[Calendar] = None,
|
|
is_test: bool = False
|
|
):
|
|
"""
|
|
Send email to organizer of the event.
|
|
:param booking: Booking that was declined
|
|
:param colliding_bookings: List of bookings which collide with the declined event
|
|
:param event_overview: Sorted list of all events in the calendar, including the declined event and collisions
|
|
:param calendar: Calendar to send the email from
|
|
another event, False otherwise
|
|
:param email_template: Email template to use for the email
|
|
:param other_bookings: List of other bookings at the same day(s) as the declined booking
|
|
:param alternatives: List of alternative calendars (rooms) for the time slot of the declined booking
|
|
:param is_test: If True, do not send the email, just print it
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
if is_test:
|
|
logger.debug("Not sending email in test mode.")
|
|
|
|
# Check if environment variables for SMTP are set
|
|
if not all([
|
|
os.getenv('SMTP_EMAIL'),
|
|
os.getenv('SMTP_PASSWORD'),
|
|
os.getenv('SMTP_SERVER'),
|
|
]):
|
|
raise Exception("SMTP environment variables are not set.")
|
|
|
|
recipient = booking.organizer
|
|
|
|
# Do not send email if recipient is empty
|
|
if not recipient or recipient == "":
|
|
logger.debug("Not sending email to empty recipient.")
|
|
return
|
|
|
|
# Prepare the email content
|
|
context = {
|
|
"booking": booking,
|
|
"colliding_bookings": colliding_bookings,
|
|
"other_bookings": other_bookings if other_bookings else [],
|
|
"overview": event_overview,
|
|
"alternatives": alternatives if alternatives else [],
|
|
"calendar_name": calendar.get_display_name(),
|
|
}
|
|
|
|
recipients = [recipient]
|
|
|
|
# Create the email message
|
|
message = MIMEMultipart("alternative")
|
|
message["From"] = email.utils.formataddr(
|
|
(os.getenv('SMTP_SENDER_NAME', 'Room Booking'),
|
|
os.getenv('SMTP_EMAIL')))
|
|
message["To"] = email.utils.formataddr((None, recipient))
|
|
if bcc := os.getenv("SMTP_BCC"):
|
|
message["Bcc"] = bcc
|
|
recipients += bcc.split(",")
|
|
message["Date"] = format_datetime(datetime.now().astimezone(tz))
|
|
|
|
# Try to render subject and body from the email template
|
|
message["Subject"] = Template(email_template.subject).render(**context)
|
|
plaintext = Template(email_template.plaintext).render(**context)
|
|
html = Template(email_template.html).render(**context)
|
|
message.attach(MIMEText(plaintext, "plain", "utf-8"))
|
|
message.attach(MIMEText(html, "html", "utf-8"))
|
|
|
|
# Create a secure SSL context
|
|
ssl_context = ssl.create_default_context()
|
|
|
|
# Send the email with starttls or SSL based on environment variable
|
|
logger.info(f"Sending email.",
|
|
extra={
|
|
"to": recipients,
|
|
"subject": message["Subject"],
|
|
"calendar": calendar.id,
|
|
"email_template": email_template.id,
|
|
"plaintext": plaintext,
|
|
"html": html,
|
|
"is_test": is_test,
|
|
})
|
|
if os.getenv('SMTP_STARTTLS', False):
|
|
logger.debug("Using STARTTLS for SMTP connection.")
|
|
if not is_test:
|
|
with smtplib.SMTP(os.getenv('SMTP_SERVER'),
|
|
os.getenv('SMTP_PORT', 587)) as server:
|
|
server.starttls(context=ssl_context)
|
|
server.login(os.getenv(" ", os.getenv("SMTP_EMAIL")),
|
|
os.getenv("SMTP_PASSWORD"))
|
|
server.sendmail(os.getenv("SMTP_EMAIL"), recipients,
|
|
message.as_string())
|
|
else:
|
|
logger.debug("Using SSL for SMTP connection.")
|
|
if not is_test:
|
|
with smtplib.SMTP_SSL(os.getenv('SMTP_SERVER'),
|
|
os.getenv('SMTP_PORT', 465),
|
|
context=ssl_context) as server:
|
|
server.login(
|
|
os.getenv("SMTP_USER_NAME", os.getenv("SMTP_EMAIL")),
|
|
os.getenv("SMTP_PASSWORD"))
|
|
server.sendmail(os.getenv("SMTP_EMAIL"), recipients,
|
|
message.as_string())
|
|
|
|
|
|
def find_other_bookings(event: DavEvent, calendar: caldav.Calendar) -> list[
|
|
DavEvent]:
|
|
"""
|
|
Find other bookings in the calendar that are at the same day(s) as the declined event.
|
|
:param event: The event to check for collisions.
|
|
:param calendar: The calendar to search in.
|
|
:return: List of DavEvent objects.
|
|
"""
|
|
# Get all events in the calendar for the days of the declined event
|
|
start_date = event.start.date()
|
|
end_date = event.end.date()
|
|
events_fetched = calendar.search(
|
|
start=start_date,
|
|
end=end_date + timedelta(days=1),
|
|
event=True,
|
|
expand=True,
|
|
split_expanded=True,
|
|
)
|
|
|
|
# Create DavEvent objects from fetched events
|
|
all_events = [DavEvent(event) for event in events_fetched]
|
|
|
|
# Filter out cancelled events and events that are
|
|
# - the exact same as the given event
|
|
# - that do not have all required fields
|
|
return [e for e in all_events if not e.is_cancelled
|
|
and e.extended_uid != event.extended_uid
|
|
and not e.missing_required_fields]
|
|
|
|
|
|
def find_alternatives(event: DavEvent, principal: Principal,
|
|
target_calendars: list) -> list[Calendar]:
|
|
"""
|
|
Find alternative rooms for the time slot of the declined event.
|
|
:param event: The event to find alternatives for.
|
|
:param principal: The principal to search for alternative rooms.
|
|
:param target_calendars: Calendars administrated by the current user
|
|
:return: List of alternative calendars that are available for the time slot.
|
|
"""
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Get all calendars of the principal
|
|
calendars = principal.calendars()
|
|
alternative_calendars = {
|
|
calendar.name: [c.name for c in calendar.alternatives.all()]
|
|
for calendar in target_calendars
|
|
}
|
|
logger.debug("Alternative calendars for event calendar: "
|
|
f"{', '.join([a for a in alternative_calendars[event.obj.parent.id]])}")
|
|
|
|
alternatives = []
|
|
for calendar in calendars:
|
|
# Skip the calendar of the declined event
|
|
if calendar.id == event.obj.parent.id:
|
|
continue
|
|
|
|
# Skip if the calendar is not administrated by the current user
|
|
if calendar.id not in alternative_calendars.keys():
|
|
continue
|
|
|
|
# Skip if the calendar is not an alternative to the calendar of the
|
|
# declined event
|
|
if calendar.id not in alternative_calendars[event.obj.parent.id]:
|
|
continue
|
|
|
|
# Search for events in the alternative calendar that collide with the
|
|
# declined event
|
|
events_fetched = calendar.search(
|
|
start=event.start,
|
|
end=event.end,
|
|
event=True,
|
|
expand=True,
|
|
split_expanded=True,
|
|
)
|
|
|
|
# Create DavEvent objects from fetched events
|
|
all_events = [DavEvent(event) for event in events_fetched]
|
|
|
|
# Filter out cancelled events and events that are missing required fields
|
|
blocked = [e for e in all_events if not e.is_cancelled
|
|
and not e.missing_required_fields]
|
|
|
|
# If there are no blocked events, the calendar is available
|
|
if not blocked:
|
|
alternatives.append(calendar)
|
|
|
|
logger.debug("Found available alternatives for event.",
|
|
extra={
|
|
"event": event.dumps(),
|
|
"alternatives": [a.id for a in alternatives],
|
|
})
|
|
return alternatives
|
|
|
|
|
|
if __name__ == "__main__":
|
|
clear_bookings()
|