382 lines
13 KiB
Python
382 lines
13 KiB
Python
import os
|
|
import secrets
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from uuid import UUID
|
|
import markdown
|
|
|
|
import requests
|
|
import shortuuid
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.core.validators import URLValidator
|
|
from django.db import models
|
|
from django.shortcuts import render, get_object_or_404
|
|
from django.utils import timezone
|
|
from ics import Calendar as ICS_Calendar, Event as ICS_Event
|
|
from nanodjango import Django
|
|
from shortuuid.django_fields import ShortUUIDField
|
|
|
|
from clear_overlaps import clear_overlaps as clear
|
|
|
|
DEBUG = os.getenv("DJANGO_DEBUG", False)
|
|
SECRET_KEY = os.getenv("DJANGO_SECRET_KEY") \
|
|
if os.getenv("DJANGO_SECRET_KEY") \
|
|
else secrets.token_hex(40) if DEBUG else None
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
DATA_DIR = Path(os.getenv("DJANGO_DATA_DIR", BASE_DIR.parent / "data"))
|
|
|
|
# Check if all required values are set
|
|
if not SECRET_KEY and not DEBUG:
|
|
print("DJANGO_SECRET_KEY is not set")
|
|
exit(1)
|
|
|
|
ALLOWED_HOSTS = [host.strip() for host in
|
|
os.getenv("DJANGO_ALLOWED_HOSTS", "").split(",")]
|
|
|
|
# Set CSRF_TRUSTED_ORIGINS to allow requests from the allowed hosts
|
|
CSRF_TRUSTED_ORIGINS = [
|
|
host if host.startswith(("http://", "https://"))
|
|
else f"https://{host}"
|
|
for host in ALLOWED_HOSTS
|
|
]
|
|
|
|
if DEBUG:
|
|
print(f"ALLOWED_HOSTS: {ALLOWED_HOSTS}")
|
|
print(f"CSRF_TRUSTED_ORIGINS: {CSRF_TRUSTED_ORIGINS}")
|
|
|
|
# Initialise nanodjango
|
|
app = Django(
|
|
SECRET_KEY=SECRET_KEY,
|
|
TIME_ZONE=os.getenv("TIME_ZONE", "Europe/Berlin"),
|
|
ALLOWED_HOSTS=["localhost"] + ALLOWED_HOSTS,
|
|
CSRF_TRUSTED_ORIGINS=CSRF_TRUSTED_ORIGINS,
|
|
SQLITE_DATABASE=DATA_DIR / "db.sqlite3",
|
|
DEBUG=DEBUG,
|
|
TEMPLATES_DIR=BASE_DIR / "templates",
|
|
STATICFILES_DIRS=[
|
|
BASE_DIR / "static",
|
|
],
|
|
)
|
|
|
|
# Import ninja after nanodjango has been initialised to avoid this error:
|
|
# https://github.com/vitalik/django-ninja/issues/1169
|
|
from ninja import NinjaAPI, ModelSchema
|
|
from ninja.security import HttpBearer, django_auth
|
|
from ninja.errors import HttpError
|
|
|
|
@app.admin
|
|
class EmailTemplate(models.Model):
|
|
"""
|
|
Email template model to store email templates for sending notifications.
|
|
"""
|
|
id = models.AutoField(primary_key=True)
|
|
name = models.CharField(max_length=100, unique=True)
|
|
subject = models.CharField(max_length=200)
|
|
body = models.TextField(help_text="""\
|
|
Email template to use for sending notifications about deleted events.<br>
|
|
If not set, no email will be sent.<br>
|
|
Jinja2 template syntax can be used in the subject and body.<br>
|
|
Available variables:<br>
|
|
- event_name: The name of the event<br>
|
|
- event_start: The start time (datetime) of the event<br>
|
|
- event_end: The end time (datetime) of the event<br>
|
|
- event_created: The datetime the event was created<br>
|
|
- event_is_deleted: Whether the event was deleted<br>
|
|
- event_is_cancelled: Whether the event was cancelled<br>
|
|
- event_is_recurring: Whether the event is recurring<br>
|
|
- event_organizer: The organizer of the event<br>
|
|
- overlap_event_name: The name of the overlap event<br>
|
|
- overlap_event_start: The start time (datetime) of the overlap event<br>
|
|
- overlap_event_end: The end time (datetime) of the overlap event<br>
|
|
- overlap_event_created: The datetime the overlap event was created<br>
|
|
- overlap_event_organizer: The organizer of the overlap event<br>
|
|
- calendar_name: The name of the calendar""")
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
@app.admin
|
|
class Calendar(models.Model):
|
|
"""
|
|
A calendar model to store events.
|
|
"""
|
|
id = models.AutoField(primary_key=True)
|
|
name = models.CharField(max_length=100, editable=False)
|
|
url = models.URLField(validators=[URLValidator])
|
|
calendar_password = models.CharField(max_length=29)
|
|
users = models.ManyToManyField(app.settings.AUTH_USER_MODEL,
|
|
related_name='calendars')
|
|
auto_clear_bookings = models.BooleanField(default=False)
|
|
auto_clear_overlap_horizon_days = models.IntegerField(default=30)
|
|
email_template = models.ForeignKey(EmailTemplate, on_delete=models.SET_NULL,
|
|
null=True, blank=True,
|
|
related_name='calendars',
|
|
help_text="""\
|
|
Email template to use for sending notifications about deleted events.<br>
|
|
If not set, no email will be sent.""")
|
|
|
|
def save(self, *args, **kwargs):
|
|
self.url = self.url.rstrip("/")
|
|
self.name = self.get_name()
|
|
return super().save(*args, **kwargs)
|
|
|
|
def get_name(self):
|
|
return self.url.split("/")[-1]
|
|
|
|
@property
|
|
def username(self):
|
|
return self.url.split("/")[-2]
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
|
|
@app.admin(ordering=("start", "end", "name"), list_filter=("cancelled",))
|
|
class Event(models.Model):
|
|
"""
|
|
Event model to store events in a calendar and send them to a CalDAV server.
|
|
"""
|
|
id = ShortUUIDField(primary_key=True, editable=False, unique=True,
|
|
default=shortuuid.uuid)
|
|
calendar = models.ForeignKey(Calendar, on_delete=models.CASCADE,
|
|
related_name='events')
|
|
name = models.CharField(max_length=100)
|
|
path = models.CharField(max_length=200, editable=False)
|
|
created = models.DateTimeField()
|
|
start = models.DateTimeField()
|
|
end = models.DateTimeField()
|
|
ics = models.TextField(blank=True)
|
|
sent = models.DateTimeField(blank=True, null=True, editable=False)
|
|
cancelled = models.DateTimeField(blank=True, null=True, editable=False)
|
|
|
|
def save(self, *args, **kwargs):
|
|
self.path = f"{self.calendar.url}/{self.uuid}"
|
|
if self.created is None:
|
|
self.created = datetime.now(tz=timezone.get_current_timezone())
|
|
self.ics = self.create_ics()
|
|
|
|
# Send the event to the CalDAV server if it has not been cancelled yet
|
|
if not self.cancelled:
|
|
try:
|
|
self.send()
|
|
except requests.HTTPError as e:
|
|
raise HttpError(401, str(e))
|
|
|
|
return super().save(*args, **kwargs)
|
|
|
|
def delete(self, *args, **kwargs):
|
|
if self.sent:
|
|
self.cancel()
|
|
return super().delete(*args, **kwargs)
|
|
|
|
def send(self):
|
|
"""
|
|
Sends a PUT request to the CalDAV server to create the event.
|
|
:return:
|
|
"""
|
|
request = requests.put(
|
|
self.path,
|
|
headers={"Content-Type": "text/calendar; charset=utf-8"},
|
|
data=bytes(self.ics, 'utf-8'),
|
|
auth=(self.calendar.username, self.calendar.calendar_password),
|
|
timeout=10,
|
|
)
|
|
request.raise_for_status()
|
|
self.sent = datetime.now(tz=timezone.get_current_timezone())
|
|
|
|
def cancel(self):
|
|
"""
|
|
Sends a DELETE request to the CalDAV server to cancel the event.
|
|
:return:
|
|
"""
|
|
try:
|
|
request = requests.delete(
|
|
self.path,
|
|
headers={"Content-Type": "text/calendar; charset=utf-8"},
|
|
auth=(self.calendar.username, self.calendar.calendar_password),
|
|
timeout=10,
|
|
)
|
|
request.raise_for_status()
|
|
except requests.HTTPError as e:
|
|
raise HttpError(e.response.status_code, str(e))
|
|
self.cancelled = datetime.now(tz=timezone.get_current_timezone())
|
|
self.save()
|
|
|
|
def create_ics(self) -> str:
|
|
"""
|
|
Create ics string
|
|
:return:
|
|
"""
|
|
c = ICS_Calendar()
|
|
e = ICS_Event()
|
|
e.uid = self.uuid.__str__()
|
|
e.name = self.name
|
|
e.created = self.created
|
|
e.begin = self.start
|
|
e.end = self.end
|
|
e.description = f"Booking-ID: {self.id}"
|
|
c.events.add(e)
|
|
return c.serialize()
|
|
|
|
@property
|
|
def uuid(self) -> UUID:
|
|
return shortuuid.decode(self.id.__str__())
|
|
|
|
def __str__(self):
|
|
string = f"{self.id} - {self.name} - ({self.start:%Y-%m-%d %H:%M} - {self.end:%Y-%m-%d %H:%M})"
|
|
return string if not self.cancelled else f"{string} - CANCELLED"
|
|
|
|
|
|
@app.admin(readonly_fields=('key',))
|
|
class APIKey(models.Model):
|
|
id = models.AutoField(primary_key=True)
|
|
user = models.ForeignKey(app.settings.AUTH_USER_MODEL,
|
|
on_delete=models.CASCADE,
|
|
related_name='api_keys')
|
|
key = models.CharField(max_length=80, unique=True, editable=False)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
@classmethod
|
|
def generate_key(cls):
|
|
return secrets.token_hex(40)
|
|
|
|
def save(self, *args, **kwargs):
|
|
if not self.key:
|
|
self.key = self.generate_key()
|
|
return super().save(*args, **kwargs)
|
|
|
|
def authenticate(self, request, api_key):
|
|
if api_key == self.key:
|
|
return api_key
|
|
|
|
def __str__(self):
|
|
return f"{self.user.username}: {self.key[:4]:*<76}"
|
|
|
|
|
|
class AuthBearer(HttpBearer):
|
|
def authenticate(self, request, token):
|
|
return APIKey.objects.filter(key=token).first()
|
|
|
|
|
|
class EventSchemaIn(ModelSchema):
|
|
class Meta:
|
|
model = Event
|
|
fields = ["name", "start", "end"]
|
|
|
|
|
|
class EventSchemaOut(ModelSchema):
|
|
class Meta:
|
|
model = Event
|
|
fields = ["id"]
|
|
|
|
|
|
# Initialising a separate ninja API instead of using nanodjango's app.ninja
|
|
# instance allows us to access the ninja.security classes defined in the ninja
|
|
# package.
|
|
api = NinjaAPI(auth=[AuthBearer(), django_auth])
|
|
|
|
|
|
def get_user(request) -> app.settings.AUTH_USER_MODEL:
|
|
"""
|
|
Get the user either from the request or from the API key.
|
|
:param request:
|
|
:return:
|
|
"""
|
|
return request.user if not request.user.is_anonymous \
|
|
else APIKey.objects.get(key=request.auth.key).user
|
|
|
|
|
|
def get_markdown():
|
|
"""
|
|
Get the markdown content from the index.md file.
|
|
:return:
|
|
"""
|
|
with open(BASE_DIR / "index.md") as f:
|
|
md = markdown.Markdown(extensions=["fenced_code"])
|
|
return md.convert(f.read())
|
|
|
|
|
|
def get_version():
|
|
"""
|
|
Get the version of the app from the version.txt file.
|
|
:return:
|
|
"""
|
|
with open(BASE_DIR / "version.txt") as f:
|
|
return f.read().strip()
|
|
|
|
|
|
@api.get("/info")
|
|
@csrf_exempt
|
|
def info(request):
|
|
if request.user.is_anonymous:
|
|
user = APIKey.objects.get(key=request.auth.key).user
|
|
else:
|
|
user = request.user
|
|
return {"user": str(user),
|
|
"calendars": [room.name for room in user.calendars.all()]}
|
|
|
|
|
|
@api.post("/{calendar}/event", response={201: EventSchemaOut})
|
|
@csrf_exempt
|
|
def create_event(request, calendar: str, event: EventSchemaIn):
|
|
user = get_user(request)
|
|
cal = get_object_or_404(Calendar, name=calendar)
|
|
if user not in cal.users.all():
|
|
raise HttpError(401,
|
|
f"User not authorised to create event in calendar '{cal.name}'")
|
|
event = Event.objects.create(calendar=cal, **event.dict())
|
|
return 201, event
|
|
|
|
|
|
@api.delete("/{calendar}/event/{event_id}", response={204: None})
|
|
@csrf_exempt
|
|
def delete_event(request, calendar: str, event_id: str):
|
|
user = get_user(request)
|
|
cal = get_object_or_404(Calendar, name=calendar)
|
|
if user not in cal.users.all():
|
|
raise HttpError(401,
|
|
f"User not authorised to delete event in calendar '{cal.name}'")
|
|
event = get_object_or_404(Event, id=event_id)
|
|
event.cancel()
|
|
return 204, None
|
|
|
|
@api.delete("/clear-bookings", response={200: dict, 204: None, 401: None, 404: None})
|
|
@csrf_exempt
|
|
def clear_bookings(request, calendar: str = None, test: bool = False):
|
|
user = get_user(request)
|
|
|
|
# Get optional calendar name from the request
|
|
if calendar:
|
|
cal = get_object_or_404(Calendar, name=calendar,
|
|
auto_clear_bookings=True)
|
|
if user not in cal.users.all():
|
|
raise HttpError(401,
|
|
f"User not authorised to clear bookings in calendar '{cal.name}'")
|
|
|
|
calendars = [cal]
|
|
|
|
# If no calendar is specified, get all calendars for the user that have
|
|
# auto_clear_bookings enabled
|
|
else:
|
|
calendars = user.calendars.filter(auto_clear_bookings=True)
|
|
|
|
if not calendars:
|
|
return 204, None # No bookings to clear
|
|
|
|
result = clear(list(calendars), is_test=test)
|
|
if all(len(r) == 0 for r in result.values() if type(r) is list):
|
|
return 204, None # No bookings to clear
|
|
else:
|
|
return 200, result
|
|
|
|
app.route("api/", include=api.urls)
|
|
|
|
@app.route("/")
|
|
@csrf_exempt
|
|
def home(request):
|
|
return render(request, "index.html", {
|
|
"content": get_markdown(),
|
|
"version": get_version(),
|
|
})
|