room-booking/src/clear_bookings.py

685 lines
26 KiB
Python

import email.utils
import json
import os
import re
import smtplib
import ssl
from datetime import datetime, date, timedelta
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import format_datetime
from pprint import pprint
import caldav
import pytz
from caldav import CalendarObjectResource, Principal, Calendar, DAVObject
from jinja2 import Template
from django.core.exceptions import ObjectDoesNotExist
tz = pytz.timezone(os.getenv("TIME_ZONE", "Europe/Berlin"))
MAX_SEARCH_HORIZON = int(os.getenv("MAX_SEARCH_HORIZON", 14))
class DavEvent:
"""
Wrapper for calendar events fetched from a CalDAV server.
"""
uid: str
name: str
start: datetime
end: datetime
duration: timedelta
created: datetime
status: str
organizer: str
calendar: DAVObject
obj: CalendarObjectResource
missing_required_fields: list
def __init__(self, event: CalendarObjectResource):
"""
Initialize a DavEvent object.
:param event: CalendarObjectResource object representing the event.
"""
for component in event.icalendar_instance.walk():
if component.name == "VEVENT":
# Check if the event has all required fields
required = {
"uid": component.get("uid", False),
"dtstart": component.get("dtstart", False),
"dtend": component.get("dtend", False),
"dtstamp": component.get("dtstamp", False),
}
if not all(required.values()):
self.missing_required_fields = [k for k, v in required.items() if v is False]
else:
self.missing_required_fields = []
self.uid = event.icalendar_component.get("uid")
self.name = event.icalendar_component.get("summary", "No Name")
self.start = self._handle_date(event.icalendar_component.get("dtstart").dt)
self.end = self._handle_date(event.icalendar_component.get("dtend").dt)
self.duration = self.end - self.start
self.created = self._handle_date(event.icalendar_component.get("dtstamp").dt)
self.status = event.icalendar_component.get("status", "CONFIRMED")
self.organizer = event.icalendar_component.get("organizer", "").replace("mailto:", "").strip()
self.calendar = event.parent
self.obj = event
self.is_deprioritized = False
self.is_deleted = False
def __hash__(self):
return hash(self.extended_uid)
def __eq__(self, other):
if not isinstance(other, DavEvent):
return NotImplemented
return self.extended_uid == other.extended_uid
@property
def extended_uid(self):
"""
Create a more unique identifier for the event.
This is necessary to distinguish between events of the same series, which
have the same UID but different start and end times.
:return: Unique identifier string
"""
# Fallback to calendar id if UID is not set
uid = self.uid if self.uid else self.calendar.id
return f"{uid}_{int(self.start.timestamp())}_{int(self.end.timestamp())}"
@property
def is_recurring(self) -> bool:
"""
Check if the event is a recurring event.
:return: True if the event is recurring, False otherwise.
"""
return 'RECURRENCE-ID' in self.obj.icalendar_component
@property
def is_cancelled(self) -> bool:
"""
Check if the event is a cancelled event.
:return: True if the event is cancelled, False otherwise.
"""
return self.obj.icalendar_component.get("status") == "CANCELLED"
@property
def is_prioritized(self) -> bool:
"""
Check if the event is a prioritized event by finding and validating the
priority token.
:return:
"""
from booking import PriorityEventToken
description = self.obj.icalendar_component.get("DESCRIPTION", "")
uid = self.uid
# Check if there is a token in the event description
token_pattern = r"\#[A-Z0-9]{12}\#"
match = re.search(token_pattern, str(description))
if match:
token = match.group()
print(f"Priority token found in event description: {token}")
try:
token_obj = PriorityEventToken.objects.get(token=token)
except ObjectDoesNotExist:
print(f"Priority token could not be found in database: {token}")
return False
# If token is already used, verify signature
if token_obj.is_used:
# Check if the signature is valid
if token_obj.validate_signature(uid):
print(f"Priority token '{token}' is valid.")
return True
else:
print(f"Priority token '{token}' is invalid.")
# TODO: Notify about invalid token usage
return False
# If the token hasn't been used yet, redeem it
else:
print(f"Redeeming priority token '{token}' for event '{uid}'.")
token_obj.redeem(uid)
# TODO: Notify about token redemption
return True
# If no token is found, return False
return False
def delete(self):
"""
Delete the event from the calendar.
:return:
"""
self.is_deleted = True
self.obj.delete()
def cancel(self):
"""
Cancel the event.
:return:
"""
self.obj.icalendar_component["status"] = "CANCELLED"
self.obj.save(no_create=True, increase_seqno=False)
def serialize(self) -> dict[str, str | datetime | timedelta]:
"""
Serialize the event to a dictionary.
:return:
"""
return {
"uid": self.uid,
"name": self.name,
"start": self.start,
"end": self.end,
"duration": self.duration,
"created": self.created,
"status": self.status,
"organizer": self.organizer,
"calendar_id": self.calendar.id,
"calendar_name": self.calendar.name,
"is_cancelled": self.is_cancelled,
"is_recurring": self.is_recurring,
"is_prioritized": self.is_prioritized,
"missing_required_fields": self.missing_required_fields,
}
def dumps(self, indent=4, datetime_format="%Y-%m-%d %X") -> str:
"""
Dump a json string with the event data.
:return: string with json data
"""
return json.dumps({
"uid": self.uid,
"name": self.name,
"start": self.start.strftime(datetime_format),
"end": self.end.strftime(datetime_format),
"duration": self.duration.total_seconds(),
"created": self.created.strftime(datetime_format),
"status": self.status,
"organizer": self.organizer,
"calendar_id": self.calendar.id,
"calendar_name": self.calendar.name,
"is_cancelled": self.is_cancelled,
"is_recurring": self.is_recurring,
"is_prioritized": self.is_prioritized,
"missing_required_fields": self.missing_required_fields,
}, indent=indent)
@staticmethod
def _handle_date(date_value: datetime | date) -> datetime:
"""
Date objects will be converted to datetime objects in the specified timezone.
"""
if isinstance(date_value, datetime):
date_value = date_value.astimezone(tz)
else:
date_value = tz.localize(datetime.combine(date_value, datetime.min.time()))
return date_value
def clear(target_calendars: list, is_test: bool=False) -> dict:
"""
Clear overlapping and cancelled events in calendars.
:param target_calendars: List of calendars to check for overlaps.
:param is_test: Do not delete events, just print what would be deleted.
:return: Dictionary with results of the operation.
"""
print(f"--- Clear bookings.{' Test mode enabled.' if is_test else ''}")
results = {
"deleted_cancelled_events": [],
"deleted_overlapping_events": [],
"cancelled_overlapping_recurring_events": [],
"ignored_overlapping_events": [],
"test_mode": is_test,
}
dav_client = caldav.DAVClient(
url=target_calendars[0].url,
username=target_calendars[0].username,
password=target_calendars[0].calendar_password,
)
principal = dav_client.principal()
# Filter calendars to only those that are in the target_calendars list
# Note: The id of the objects returned by principal.calendars() corresponds
# to the name of the Django Calendar model instances.
tcal_by_name = {c.name: c for c in target_calendars}
calendars = [cal for cal in principal.calendars()
if cal.id in tcal_by_name.keys()]
if not calendars:
print("--- No calendars to clear overlaps in. Exiting.")
return results
for calendar in calendars:
# Get events from calendar
print(f"--- Clearing cancelled bookings and overlaps in calendar: {calendar.id}")
horizon = tcal_by_name[calendar.id].auto_clear_overlap_horizon_days
# Split horizon search into multiple requests if horizon is bigger
# than MAX_SEARCH_HORIZON
horizons = []
while horizon > 0:
if horizon >= MAX_SEARCH_HORIZON:
horizons.append(MAX_SEARCH_HORIZON)
else:
horizons.append(horizon)
horizon -= MAX_SEARCH_HORIZON
events_fetched = []
start_delta = 0
end_delta = 0
today = datetime.now(tz=tz).date()
for h in horizons:
end_delta += h
try:
events_fetched.extend(calendar.search(
start=today + timedelta(days=start_delta),
end=today + timedelta(days=end_delta),
event=True,
expand=True,
split_expanded=True,
))
except Exception as e:
print(f"--- Failed to fetch events for calendar: {calendar.id}: {e}")
continue
start_delta += h
# Create DavEvent objects from fetched events
events = []
for event in events_fetched:
try:
events.append(DavEvent(event))
except ValueError as e:
print(f"Error creating DavEvent object: {e}")
continue
# Filter out events that are missing required fields
events_missing_required_fields = [
event for event in events if event.missing_required_fields
]
for event in events_missing_required_fields:
result = {
"uid": event.uid or "No UID",
"name": event.name or "No Name",
"reason": f"Missing required fields: {', '.join(event.missing_required_fields)}",
"calendar": calendar.id,
}
results["ignored_overlapping_events"].append(result)
print("Skipping event:")
pprint(result)
print("------")
continue
events = [event for event in events if not event.missing_required_fields]
# Delete cancelled non-recurring events if not in test mode
for event in events:
if event.is_cancelled and not event.is_recurring:
if not is_test:
event.obj.delete()
result = {
"uid": event.uid,
"name": event.name or "No Name",
"calendar": event.calendar.id,
}
results["deleted_cancelled_events"].append(result)
print("Deleted cancelled event:")
pprint(result)
print("------")
# Find overlapping events
overlapping_events = find_overlapping_events(events)
overlapping_events_json = json.dumps([json.loads(o.get("event").dumps()) for _, o in overlapping_events.items()], indent=2)
print(f"Found overlapping events:\n{overlapping_events_json}")
# Delete overlapping events and send emails to organizers
for overlap in overlapping_events.values():
event = overlap["event"]
overlaps_with = overlap["overlaps_with"]
event.is_deprioritized = any(ov.is_prioritized for ov in overlaps_with)
result = {
"event": event.serialize(),
"overlaps_with": [ov.serialize() for ov in overlaps_with],
"calendar": calendar.id,
}
try:
# If this is not a recurring event, we can delete it directly
if not event.is_recurring:
# but only if we are not in test mode
if not is_test:
event.delete()
result["cancellation_type"] = "deleted"
results["deleted_overlapping_events"].append(result)
print("Deleted overlapping event:")
# If this is a recurring event, and not already cancelled,
# we need to cancel it now
elif not event.is_cancelled:
if not is_test:
event.cancel()
result["cancellation_type"] = "cancelled"
results["cancelled_overlapping_recurring_events"].append(result)
print("Cancelled overlapping recurring event:")
# Get all events of the affected calendar for the days of the
# declined event
other_bookings = find_other_bookings(event, calendar)
# Find free alternative rooms for the time slot of the declined event
alternatives = find_alternatives(event, principal, target_calendars)
# Create a set for all events as an overview
event.is_declined = True
event.is_overlapping = False
event.is_other_booking = False
overview = {event}
# Add the overlapping events to the overview
for ov in overlaps_with:
ov.is_declined = False
ov.is_overlapping = True
ov.is_other_booking = False
overview.add(ov)
# Add the other bookings to the overview
for ob in other_bookings:
ob.is_declined = False
ob.is_overlapping = False
ob.is_other_booking = True
overview.add(ob)
# Make a list from the set to ensure order
overview = list(overview)
# Sort the overview by start time
overview.sort(key=lambda ev: ev.start)
# Send email to organizer of the event if not in test mode
email_template = tcal_by_name[calendar.id].email_template
if email_template:
try:
send_mail_to_organizer(
booking=event,
overlap_bookings=overlaps_with,
event_overview=overview,
other_bookings=other_bookings,
alternatives=alternatives,
calendar=calendar,
email_template=email_template,
is_test=is_test,
)
result["email_sent"] = True
except Exception as e:
print("Failed to send email to organizer for event "
f"{event.name}: {e}")
result["email_sent"] = False
else:
print(f"No email template found for calendar {calendar.id}. "
"Skipping email sending.")
result["email_sent"] = False
# Print results
pprint(result)
print("------")
except Exception as e:
print(f"Failed to delete event {event.uid}: {e}")
print("------")
continue
print(f"--- Clearing completed.{' Test mode enabled.' if is_test else ''}")
return results
def find_overlapping_events(events: list[DavEvent]) -> dict:
"""
Find overlapping events.
:param events: List of events to check for overlaps.
:return: Dictionary of overlapping events with their UIDs as keys and
a dictionary containing the event and the event it overlaps with
as values.
"""
overlapping_events = {}
# Order events by created time
events = sorted(events, key=lambda item: item.created)
# Define lambda functions to check for overlaps
event_starts_later = lambda e1, e2: e1.end >= e2.end > e1.start
event_starts_earlier = lambda e1, e2: e1.end > e2.start >= e1.start
event_inbetween = lambda e1, e2: e2.start < e1.start < e1.end < e2.end
# Find overlapping events
for event in events:
# Skip if the event is already in the overlapping events dictionary
# or if it is already canceled
if overlapping_events.get(event.extended_uid) or event.is_cancelled:
continue
for compare_event in events:
# Skip if the compare_event is same event or belongs to the same
# series as the compare event
if event.uid == compare_event.uid:
continue
# Skip if the compare_event is already in the overlapping events
# dictionary or if it is already canceled
if (overlapping_events.get(compare_event.extended_uid)
or compare_event.is_cancelled):
continue
# Check if the events overlap
if event_starts_later(event, compare_event) or \
event_starts_earlier(event, compare_event) or \
event_inbetween(event, compare_event):
# If both events are prioritized, skip the overlap
if compare_event.is_prioritized and event.is_prioritized:
print(f"Skipping overlap between prioritized events: "
f"'{event.name}' and '{compare_event.name}'")
continue
# If the compare_event is prioritized and the event is not,
# skip the overlap
elif compare_event.is_prioritized and not event.is_prioritized:
print(f"Skipping overlap between event '{event.name}' "
f"and prioritized event '{compare_event.name}'")
continue
# Add to overlapping events dictionary
if e := overlapping_events.get(event.extended_uid):
# If the event is already in the dictionary, extend the overlaps
e["overlaps_with"].append(event)
else:
# Create a new entry for the overlapping event
overlapping_events[event.extended_uid] = {
"event": compare_event,
"overlaps_with": [event]
}
return overlapping_events
def send_mail_to_organizer(
booking: DavEvent,
overlap_bookings: list[DavEvent],
event_overview: list[DavEvent],
calendar: caldav.Calendar,
email_template,
other_bookings: list[DavEvent]=None,
alternatives: list[Calendar]=None,
is_test: bool=False
):
"""
Send email to organizer of the event.
:param booking: Booking that was declined
:param overlap_bookings: List of bookings which overlap with the declined event
:param event_overview: Sorted list of all events in the calendar, including the declined event and overlaps
:param calendar: Calendar to send the email from
another event, False otherwise
:param email_template: Email template to use for the email
:param other_bookings: List of other bookings at the same day(s) as the declined booking
:param alternatives: List of alternative calendars (rooms) for the time slot of the declined booking
:param is_test: If True, do not send the email, just print it
"""
if is_test:
print("Not sending email in test mode.")
# Check if environment variables for SMTP are set
if not all([
os.getenv('SMTP_EMAIL'),
os.getenv('SMTP_PASSWORD'),
os.getenv('SMTP_SERVER'),
]):
raise Exception("SMTP environment variables are not set.")
recipient = booking.organizer
# Prepare the email content
context = {
"booking": booking,
"overlap_bookings": overlap_bookings,
"other_bookings": other_bookings if other_bookings else [],
"overview": event_overview,
"alternatives": alternatives if alternatives else [],
"calendar_name": calendar.get_display_name(),
}
recipients = [recipient]
# Create the email message
message = MIMEMultipart("alternative")
message["From"] = email.utils.formataddr((os.getenv('SMTP_SENDER_NAME', 'Room Booking'), os.getenv('SMTP_EMAIL')))
message["To"] = email.utils.formataddr((None, recipient))
if bcc := os.getenv("SMTP_BCC"):
message["Bcc"] = bcc
recipients += bcc.split(",")
message["Date"] = format_datetime(datetime.now().astimezone(tz))
# Try to render subject and body from the email template
message["Subject"] = Template(email_template.subject).render(**context)
plaintext = Template(email_template.plaintext).render(**context)
html = Template(email_template.html).render(**context)
message.attach(MIMEText(plaintext, "plain", "utf-8"))
message.attach(MIMEText(html, "html", "utf-8"))
# Create a secure SSL context
ssl_context = ssl.create_default_context()
# Send the email with starttls or SSL based on environment variable
print(f"Sending email to {', '.join(recipients)} with subject: {message['Subject']}")
if os.getenv('SMTP_STARTTLS', False):
print("Using STARTTLS for SMTP connection.")
if not is_test:
with smtplib.SMTP(os.getenv('SMTP_SERVER'),
os.getenv('SMTP_PORT', 587)) as server:
server.starttls(context=ssl_context)
server.login(os.getenv(" ", os.getenv("SMTP_EMAIL")), os.getenv("SMTP_PASSWORD"))
server.sendmail(os.getenv("SMTP_EMAIL"), recipients, message.as_string())
else:
print("Using SSL for SMTP connection.")
if not is_test:
with smtplib.SMTP_SSL(os.getenv('SMTP_SERVER'),
os.getenv('SMTP_PORT', 465),
context=ssl_context) as server:
server.login(os.getenv("SMTP_USER_NAME", os.getenv("SMTP_EMAIL")), os.getenv("SMTP_PASSWORD"))
server.sendmail(os.getenv("SMTP_EMAIL"), recipients, message.as_string())
def find_other_bookings(event: DavEvent, calendar: caldav.Calendar) -> list[DavEvent]:
"""
Find other bookings in the calendar that are at the same day(s) as the declined event.
:param event: The event to check for overlaps.
:param calendar: The calendar to search in.
:return: List of DavEvent objects.
"""
# Get all events in the calendar for the days of the declined event
start_date = event.start.date()
end_date = event.end.date()
events_fetched = calendar.search(
start=start_date,
end=end_date + timedelta(days=1),
event=True,
expand=True,
split_expanded=True,
)
# Create DavEvent objects from fetched events
all_events = [DavEvent(event) for event in events_fetched]
# Filter out cancelled events and events that are
# - the exact same as the given event
# - that do not have all required fields
return [e for e in all_events if not e.is_cancelled
and e.extended_uid != event.extended_uid
and not e.missing_required_fields]
def find_alternatives(event: DavEvent, principal: Principal,
target_calendars: list) -> list[Calendar]:
"""
Find alternative rooms for the time slot of the declined event.
:param event: The event to find alternatives for.
:param principal: The principal to search for alternative rooms.
:param target_calendars: Calendars administrated by the current user
:return: List of alternative calendars that are available for the time slot.
"""
# Get all calendars of the principal
calendars = principal.calendars()
alternative_calendars = {
calendar.name: [c.name for c in calendar.alternatives.all()]
for calendar in target_calendars
}
print("Alternative calendars for event calendar: "
f"{', '.join([a for a in alternative_calendars[event.obj.parent.id]])}")
alternatives = []
for calendar in calendars:
# Skip the calendar of the declined event
if calendar.id == event.obj.parent.id:
continue
# Skip if the calendar is not administrated by the current user
if calendar.id not in alternative_calendars.keys():
continue
# Skip if the calendar is not an alternative to the calendar of the
# declined event
if calendar.id not in alternative_calendars[event.obj.parent.id]:
continue
# Search for events in the alternative calendar that overlap with the
# declined event
events_fetched = calendar.search(
start=event.start,
end=event.end,
event=True,
expand=True,
split_expanded=True,
)
# Create DavEvent objects from fetched events
all_events = [DavEvent(event) for event in events_fetched]
# Filter out cancelled events and events that are missing required fields
blocked = [e for e in all_events if not e.is_cancelled
and not e.missing_required_fields]
# If there are no blocked events, the calendar is available
if not blocked:
alternatives.append(calendar)
print("Available alternative calendars for event: "
f"{', '.join([a.id for a in alternatives])}")
return alternatives