import email.utils import json import os import re import smtplib import ssl from datetime import datetime, date, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import format_datetime from pprint import pprint import caldav import pytz from caldav import CalendarObjectResource, Principal, Calendar from jinja2 import Template tz = pytz.timezone(os.getenv("TIME_ZONE", "Europe/Berlin")) class DavEvent: """ Wrapper for calendar events fetched from a CalDAV server. """ uid: str name: str start: datetime end: datetime duration: timedelta created: datetime status: str organizer: str calendar: Calendar obj: CalendarObjectResource missing_required_fields: list def __init__(self, event: CalendarObjectResource): """ Initialize a DavEvent object. :param event: CalendarObjectResource object representing the event. """ for component in event.icalendar_instance.walk(): if component.name == "VEVENT": # Check if the event has all required fields required = { "uid": component.get("uid", False), "dtstart": component.get("dtstart", False), "dtend": component.get("dtend", False), "dtstamp": component.get("dtstamp", False), } if not all(required.values()): self.missing_required_fields = [k for k, v in required.items() if v is False] else: self.missing_required_fields = [] self.uid = event.icalendar_component.get("uid") self.name = event.icalendar_component.get("summary", "No Name") self.start = self._handle_date(event.icalendar_component.get("dtstart").dt) self.end = self._handle_date(event.icalendar_component.get("dtend").dt) self.duration = self.end - self.start self.created = self._handle_date(event.icalendar_component.get("dtstamp").dt) self.status = event.icalendar_component.get("status", "CONFIRMED") self.organizer = event.icalendar_component.get("organizer", "").replace("mailto:", "").strip() self.calendar = event.parent self.obj = event self.is_deprioritized = False self.is_deleted = False def __hash__(self): return hash(self.extended_uid) def __eq__(self, other): if not isinstance(other, DavEvent): return NotImplemented return self.extended_uid == other.extended_uid @property def extended_uid(self): """ Create a more unique identifier for the event. This is necessary to distinguish between events of the same series, which have the same UID but different start and end times. :return: Unique identifier string """ # Fallback to calendar id if UID is not set uid = self.uid if self.uid else self.calendar.id return f"{uid}_{int(self.start.timestamp())}_{int(self.end.timestamp())}" @property def is_recurring(self) -> bool: """ Check if the event is a recurring event. :return: True if the event is recurring, False otherwise. """ return 'RECURRENCE-ID' in self.obj.icalendar_component @property def is_cancelled(self) -> bool: """ Check if the event is a cancelled event. :return: True if the event is cancelled, False otherwise. """ return self.obj.icalendar_component.get("status") == "CANCELLED" @property def is_prioritized(self) -> bool: """ Check if the event is a prioritized event by finding and validating the priority token. :return: """ from booking import PriorityEventToken description = self.obj.icalendar_component.get("DESCRIPTION", "") uid = self.uid # Check if there is a token in the event description token_pattern = r"\#[A-Z0-9]{12}\#" match = re.search(token_pattern, str(description)) if match: token = match.group() print(f"Priority token found in event description: {token}") token_obj = PriorityEventToken.objects.get(token=token) # Check if the token object exists in the database if not token_obj: print(f"Priority token '{token}' not found in database.") return False # If token is already used, verify signature if token_obj.is_used: # Check if the signature is valid if token_obj.validate_signature(uid): print(f"Priority token '{token}' is valid.") return True else: print(f"Priority token '{token}' is invalid.") # TODO: Notify about invalid token usage return False # If the token hasn't been used yet, redeem it else: print(f"Redeeming priority token '{token}' for event '{uid}'.") token_obj.redeem(uid) # TODO: Notify about token redemption return True # If no token is found, return False return False def delete(self): """ Delete the event from the calendar. :return: """ self.is_deleted = True self.obj.delete() def cancel(self): """ Cancel the event. :return: """ self.obj.icalendar_component["status"] = "CANCELLED" self.obj.save(no_create=True, increase_seqno=False) def serialize(self) -> dict[str, str | datetime | timedelta]: """ Serialize the event to a dictionary. :return: """ return { "uid": self.uid, "name": self.name, "start": self.start, "end": self.end, "duration": self.duration, "created": self.created, "status": self.status, "organizer": self.organizer, "calendar_id": self.calendar.id, "calendar_name": self.calendar.name, "is_cancelled": self.is_cancelled, "is_recurring": self.is_recurring, "is_prioritized": self.is_prioritized, "missing_required_fields": self.missing_required_fields, } def dumps(self, indent=4, datetime_format="%Y-%m-%d %X") -> str: """ Dump a json string with the event data. :return: string with json data """ return json.dumps({ "uid": self.uid, "name": self.name, "start": self.start.strftime(datetime_format), "end": self.end.strftime(datetime_format), "duration": self.duration.total_seconds(), "created": self.created.strftime(datetime_format), "status": self.status, "organizer": self.organizer, "calendar_id": self.calendar.id, "calendar_name": self.calendar.name, "is_cancelled": self.is_cancelled, "is_recurring": self.is_recurring, "is_prioritized": self.is_prioritized, "missing_required_fields": self.missing_required_fields, }, indent=indent) @staticmethod def _handle_date(date_value: datetime | date) -> datetime: """ Date objects will be converted to datetime objects in the specified timezone. """ if isinstance(date_value, datetime): date_value = date_value.astimezone(tz) else: date_value = tz.localize(datetime.combine(date_value, datetime.min.time())) return date_value def clear(target_calendars: list, is_test: bool=False) -> dict: """ Clear overlapping and cancelled events in calendars. :param target_calendars: List of calendars to check for overlaps. :param is_test: Do not delete events, just print what would be deleted. :return: Dictionary with results of the operation. """ print(f"--- Clear bookings.{' Test mode enabled.' if is_test else ''}") results = { "deleted_cancelled_events": [], "deleted_overlapping_events": [], "cancelled_overlapping_recurring_events": [], "ignored_overlapping_events": [], "test_mode": is_test, } dav_client = caldav.DAVClient( url=target_calendars[0].url, username=target_calendars[0].username, password=target_calendars[0].calendar_password, ) principal = dav_client.principal() # Filter calendars to only those that are in the target_calendars list # Note: The id of the objects returned by principal.calendars() corresponds # to the name of the Django Calendar model instances. tcal_by_name = {c.name: c for c in target_calendars} calendars = [cal for cal in principal.calendars() if cal.id in tcal_by_name.keys()] if not calendars: print("--- No calendars to clear overlaps in. Exiting.") return results for calendar in calendars: # Get events from calendar print(f"--- Clearing cancelled bookings and overlaps in calendar: {calendar.id}") horizon = tcal_by_name[calendar.id].auto_clear_overlap_horizon_days try: events_fetched = calendar.search( start=datetime.now(), end=date.today() + timedelta(days=horizon), event=True, expand=True, split_expanded=True, ) except Exception as e: print(f"--- Failed to fetch events for calendar: {calendar.id}: {e}") continue # Create DavEvent objects from fetched events events = [] for event in events_fetched: try: events.append(DavEvent(event)) except ValueError as e: print(f"Error creating DavEvent object: {e}") continue # Filter out events that are missing required fields events_missing_required_fields = [ event for event in events if event.missing_required_fields ] for event in events_missing_required_fields: result = { "uid": event.uid or "No UID", "name": event.name or "No Name", "reason": f"Missing required fields: {', '.join(event.missing_required_fields)}", "calendar": calendar.id, } results["ignored_overlapping_events"].append(result) print("Skipping event:") pprint(result) print("------") continue events = [event for event in events if not event.missing_required_fields] # Delete cancelled non-recurring events if not in test mode for event in events: if event.is_cancelled and not event.is_recurring: if not is_test: event.obj.delete() result = { "uid": event.uid, "name": event.name or "No Name", "calendar": event.calendar.id, } results["deleted_cancelled_events"].append(result) print("Deleted cancelled event:") pprint(result) print("------") # Find overlapping events overlapping_events = find_overlapping_events(events) overlapping_events_json = json.dumps([json.loads(o.get("event").dumps()) for _, o in overlapping_events.items()], indent=2) print(f"Found overlapping events:\n{overlapping_events_json}") # Delete overlapping events and send emails to organizers for overlap in overlapping_events.values(): event = overlap["event"] overlaps_with = overlap["overlaps_with"] event.is_deprioritized = any(ov.is_prioritized for ov in overlaps_with) result = { "event": event.serialize(), "overlaps_with": [ov.serialize() for ov in overlaps_with], "calendar": calendar.id, } try: # If this is not a recurring event, we can delete it directly if not event.is_recurring: # but only if we are not in test mode if not is_test: event.delete() result["cancellation_type"] = "deleted" results["deleted_overlapping_events"].append(result) print("Deleted overlapping event:") # If this is a recurring event, and not already cancelled, # we need to cancel it now elif not event.is_cancelled: if not is_test: event.cancel() is_cancelled = True result["cancellation_type"] = "cancelled" results["cancelled_overlapping_recurring_events"].append(result) print("Cancelled overlapping recurring event:") # Get all events of the affected calendar for the days of the # declined event other_bookings = find_other_bookings(event, calendar) # Find free alternative rooms for the time slot of the declined event alternatives = find_alternatives(event, principal) # Create a set for all events as an overview event.is_declined = True event.is_overlapping = False event.is_other_booking = False overview = {event} # Add the overlapping events to the overview for ov in overlaps_with: ov.is_declined = False ov.is_overlapping = True ov.is_other_booking = False overview.add(ov) # Add the other bookings to the overview for ob in other_bookings: ob.is_declined = True ob.is_overlapping = False ob.is_other_booking = True overview.add(ob) # Make a list from the set to ensure order overview = list(overview) # Sort the overview by start time overview.sort(key=lambda ev: ev.start) # Send email to organizer of the event if not in test mode email_template = tcal_by_name[calendar.id].email_template if email_template: try: send_mail_to_organizer( booking=event, overlap_bookings=overlaps_with, event_overview=overview, other_bookings=other_bookings, alternatives=alternatives, calendar=calendar, email_template=email_template, is_test=is_test, ) result["email_sent"] = True except Exception as e: print("Failed to send email to organizer for event " f"{event.name}: {e}") result["email_sent"] = False else: print(f"No email template found for calendar {calendar.id}. " "Skipping email sending.") result["email_sent"] = False # Print results pprint(result) print("------") except Exception as e: print(f"Failed to delete event {event.uid}: {e}") print("------") continue print(f"--- Clearing completed.{' Test mode enabled.' if is_test else ''}") return results def find_overlapping_events(events: list[DavEvent]) -> dict: """ Find overlapping events. :param events: List of events to check for overlaps. :return: Dictionary of overlapping events with their UIDs as keys and a dictionary containing the event and the event it overlaps with as values. """ overlapping_events = {} # Order events by created time events = sorted(events, key=lambda item: item.created) # Define lambda functions to check for overlaps event_starts_later = lambda e1, e2: e1.end >= e2.end > e1.start event_starts_earlier = lambda e1, e2: e1.end > e2.start >= e1.start event_inbetween = lambda e1, e2: e2.start < e1.start < e1.end < e2.end # Find overlapping events for event in events: # Skip if the event is already in the overlapping events dictionary # or if it is already canceled if overlapping_events.get(event.extended_uid) or event.is_cancelled: continue for compare_event in events: # Skip if the compare_event is same event or belongs to the same # series as the compare event if event.uid == compare_event.uid: continue # Skip if the compare_event is already in the overlapping events # dictionary or if it is already canceled if (overlapping_events.get(compare_event.extended_uid) or compare_event.is_cancelled): continue # Check if the events overlap if event_starts_later(event, compare_event) or \ event_starts_earlier(event, compare_event) or \ event_inbetween(event, compare_event): # If both events are prioritized, skip the overlap if compare_event.is_prioritized and event.is_prioritized: print(f"Skipping overlap between prioritized events: " f"'{event.name}' and '{compare_event.name}'") continue # If the compare_event is prioritized and the event is not, # skip the overlap elif compare_event.is_prioritized and not event.is_prioritized: print(f"Skipping overlap between event '{event.name}' " f"and prioritized event '{compare_event.name}'") continue # Add to overlapping events dictionary if e := overlapping_events.get(event.extended_uid): # If the event is already in the dictionary, extend the overlaps e["overlaps_with"].append(event) else: # Create a new entry for the overlapping event overlapping_events[event.extended_uid] = { "event": compare_event, "overlaps_with": [event] } return overlapping_events def send_mail_to_organizer( booking: DavEvent, overlap_bookings: list[DavEvent], event_overview: list[DavEvent], calendar: caldav.Calendar, email_template, other_bookings: list[DavEvent]=None, alternatives: list[Calendar]=None, is_test: bool=False ): """ Send email to organizer of the event. :param booking: Booking that was declined :param overlap_bookings: List of bookings which overlap with the declined event :param event_overview: Sorted list of all events in the calendar, including the declined event and overlaps :param calendar: Calendar to send the email from another event, False otherwise :param email_template: Email template to use for the email :param other_bookings: List of other bookings at the same day(s) as the declined booking :param alternatives: List of alternative calendars (rooms) for the time slot of the declined booking :param is_test: If True, do not send the email, just print it """ if is_test: print("Not sending email in test mode.") # Check if environment variables for SMTP are set if not all([ os.getenv('SMTP_EMAIL'), os.getenv('SMTP_PASSWORD'), os.getenv('SMTP_SERVER'), ]): raise Exception("SMTP environment variables are not set.") recipient = booking.organizer # Prepare the email content context = { "booking": booking, "overlap_bookings": overlap_bookings, "other_bookings": other_bookings if other_bookings else [], "overview": event_overview, "alternatives": alternatives if alternatives else [], "calendar_name": calendar.get_display_name(), } recipients = [recipient] # Create the email message message = MIMEMultipart("alternative") message["From"] = email.utils.formataddr((os.getenv('SMTP_SENDER_NAME', 'Room Booking'), os.getenv('SMTP_EMAIL'))) message["To"] = email.utils.formataddr((None, recipient)) if bcc := os.getenv("SMTP_BCC"): message["Bcc"] = bcc recipients += bcc.split(",") message["Date"] = format_datetime(datetime.now().astimezone(tz)) # Try to render subject and body from the email template message["Subject"] = Template(email_template.subject).render(**context) plaintext = Template(email_template.plaintext).render(**context) html = Template(email_template.html).render(**context) message.attach(MIMEText(plaintext, "plain", "utf-8")) message.attach(MIMEText(html, "html", "utf-8")) # Create a secure SSL context ssl_context = ssl.create_default_context() # Send the email with starttls or SSL based on environment variable print(f"Sending email to {', '.join(recipients)} with subject: {message['Subject']}") if os.getenv('SMTP_STARTTLS', False): print("Using STARTTLS for SMTP connection.") if not is_test: with smtplib.SMTP(os.getenv('SMTP_SERVER'), os.getenv('SMTP_PORT', 587)) as server: server.starttls(context=ssl_context) server.login(os.getenv(" ", os.getenv("SMTP_EMAIL")), os.getenv("SMTP_PASSWORD")) server.sendmail(os.getenv("SMTP_EMAIL"), recipients, message.as_string()) else: print("Using SSL for SMTP connection.") if not is_test: with smtplib.SMTP_SSL(os.getenv('SMTP_SERVER'), os.getenv('SMTP_PORT', 465), context=ssl_context) as server: server.login(os.getenv("SMTP_USER_NAME", os.getenv("SMTP_EMAIL")), os.getenv("SMTP_PASSWORD")) server.sendmail(os.getenv("SMTP_EMAIL"), recipients, message.as_string()) def find_other_bookings(event: DavEvent, calendar: caldav.Calendar) -> list[DavEvent]: """ Find other bookings in the calendar that are at the same day(s) as the declined event. :param event: The event to check for overlaps. :param calendar: The calendar to search in. :return: List of DavEvent objects. """ # Get all events in the calendar for the days of the declined event start_date = event.start.date() end_date = event.end.date() events_fetched = calendar.search( start=start_date, end=end_date + timedelta(days=1), event=True, expand=True, split_expanded=True, ) # Create DavEvent objects from fetched events all_events = [DavEvent(event) for event in events_fetched] # Filter out cancelled events and events that are # - the exact same as the given event # - that do not have all required fields return [e for e in all_events if not e.is_cancelled and e.extended_uid != event.extended_uid and not e.missing_required_fields] def find_alternatives(event: DavEvent, principal: Principal) -> list[Calendar]: """ Find alternative rooms for the time slot of the declined event. :param event: The event to find alternatives for. :param principal: The principal to search for alternative rooms. :return: List of alternative calendars that are available for the time slot. """ # Get all calendars of the principal calendars = principal.calendars() alternatives = [] for calendar in calendars: # Skip the calendar of the declined event if calendar.id == event.obj.parent.id: continue # Search for events in the alternative calendar that overlap with the # declined event events_fetched = calendar.search( start=event.start, end=event.end, event=True, expand=True, split_expanded=True, ) # Create DavEvent objects from fetched events all_events = [DavEvent(event) for event in events_fetched] # Filter out cancelled events and events that are missing required fields blocked = [e for e in all_events if not e.is_cancelled and not e.missing_required_fields] # If there are no blocked events, the calendar is available if not blocked: alternatives.append(calendar) return alternatives