378 lines
14 KiB
Python
378 lines
14 KiB
Python
"""Webapp views."""
|
|
|
|
import json
|
|
import time
|
|
from jwt import decode
|
|
from jwt.exceptions import InvalidTokenError
|
|
from django.utils import timezone
|
|
from django.conf import settings
|
|
from django.http import HttpResponse, JsonResponse
|
|
from django.shortcuts import get_object_or_404
|
|
from django.views.decorators.http import require_http_methods
|
|
from django_filters import rest_framework as filters
|
|
from django.db.models import Prefetch
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django.db.utils import IntegrityError
|
|
from rest_framework import routers
|
|
from rest_framework.response import Response
|
|
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet
|
|
from rest_framework.filters import OrderingFilter, SearchFilter
|
|
from rest_framework.decorators import action
|
|
from rest_framework.permissions import (
|
|
IsAuthenticatedOrReadOnly,
|
|
BasePermission,
|
|
AllowAny,
|
|
IsAuthenticated,
|
|
)
|
|
from jsonschema import validate
|
|
from jsonschema.exceptions import ValidationError
|
|
|
|
from webapp.models import *
|
|
from webapp.serializers import *
|
|
from webapp.utils import admin_send_email_signupees, decode_base64_file
|
|
|
|
|
|
with open("./webapp/questionSchema.json", "r") as file:
|
|
QUESTION_SCHEMA = json.load(file)
|
|
|
|
|
|
class SignupPermission(BasePermission):
|
|
def has_permission(self, request, view):
|
|
if request.method == "POST":
|
|
return True
|
|
return request.user and request.user.is_authenticated
|
|
|
|
|
|
# -- REST API -- #
|
|
class RootView(routers.APIRootView):
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
|
|
class EventViewSet(ModelViewSet):
|
|
queryset = Event.objects.filter(deleted=False)
|
|
ordering = ["start_time"]
|
|
serializer_class = EventSerializer
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter)
|
|
filter_fields = ("id", "tags", "visible", "signupForm")
|
|
search_fields = ("id", "tags", "visible", "signupForm")
|
|
|
|
def get_queryset(self):
|
|
|
|
# TODO: For create and update, this return old data in signupForm field (prefetched)...
|
|
if (
|
|
self.request.user.is_authenticated
|
|
or self.request.method == "POST"
|
|
or self.request.method == "PUT"
|
|
):
|
|
return Event.objects.filter(deleted=False).prefetch_related(
|
|
Prefetch(
|
|
"signupForm",
|
|
queryset=SignupForm.objects.filter(deleted=False),
|
|
to_attr="filtered_signup_forms",
|
|
)
|
|
)
|
|
|
|
since = self.request.query_params.get("since", None)
|
|
if since:
|
|
return (
|
|
Event.objects.filter(deleted=False, visible=True, end_time__gt=since)
|
|
.order_by("start_time")
|
|
.prefetch_related(
|
|
Prefetch(
|
|
"signupForm",
|
|
queryset=SignupForm.objects.filter(deleted=False, visible=True),
|
|
to_attr="filtered_signup_forms",
|
|
)
|
|
)
|
|
)
|
|
return (
|
|
Event.objects.filter(
|
|
deleted=False, visible=True, end_time__gt=timezone.now()
|
|
)
|
|
.order_by("start_time")
|
|
.prefetch_related(
|
|
Prefetch(
|
|
"signupForm",
|
|
queryset=SignupForm.objects.filter(deleted=False, visible=True),
|
|
to_attr="filtered_signup_forms",
|
|
)
|
|
)
|
|
)
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
raw_image = request.data.get("image", None)
|
|
if raw_image is not None:
|
|
image = decode_base64_file(raw_image)
|
|
request.data.update({"image": image})
|
|
return super().create(request, *args, **kwargs)
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
raw_image = request.data.get("image", None)
|
|
if raw_image is not None:
|
|
image = decode_base64_file(raw_image)
|
|
request.data.update({"image": image})
|
|
return super().update(request, *args, **kwargs)
|
|
|
|
def destroy(self, request, pk=None, *args, **kwargs):
|
|
try:
|
|
event = self.get_object()
|
|
event.deleted = True
|
|
event.save()
|
|
return JsonResponse(status=200, data={"message": "OK"})
|
|
except ObjectDoesNotExist:
|
|
return JsonResponse(status=404, data={"error": f"Event {pk} not found"})
|
|
|
|
|
|
class SignupFormViewSet(ModelViewSet):
|
|
queryset = SignupForm.objects.filter(deleted=False)
|
|
ordering = ["start_time"]
|
|
serializer_class = SignupFormSerializer
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
try:
|
|
schema = QUESTION_SCHEMA
|
|
validate(instance=request.data["questions"], schema=schema)
|
|
return super().create(request, *args, **kwargs)
|
|
except ValidationError as err:
|
|
return JsonResponse(status=400, data={"error": err.message})
|
|
|
|
def update(self, request, *args, **kwargs):
|
|
try:
|
|
schema = QUESTION_SCHEMA
|
|
validate(instance=request.data["questions"], schema=schema)
|
|
return super().update(request, *args, **kwargs)
|
|
except ValidationError as err:
|
|
return JsonResponse(status=400, data={"error": err.message})
|
|
|
|
def get_queryset(self):
|
|
if self.request.user.is_authenticated:
|
|
return SignupForm.objects.filter(deleted=False).order_by("start_time")
|
|
return SignupForm.objects.filter(deleted=False, visible=True).order_by(
|
|
"start_time"
|
|
)
|
|
|
|
def destroy(self, request, pk=None, *args, **kwargs):
|
|
try:
|
|
form = self.get_object()
|
|
form.deleted = True
|
|
form.save()
|
|
return JsonResponse(status=200, data={"message": "OK"})
|
|
except ObjectDoesNotExist:
|
|
return JsonResponse(
|
|
status=404, data={"error": f"SignupForm {pk} not found"}
|
|
)
|
|
|
|
@action(detail=True, methods=["post"], permission_classes=[IsAuthenticated])
|
|
def sendemail(self, request, pk=None, *args, **kwargs):
|
|
subject = request.data["subject"]
|
|
content = request.data["content"]
|
|
mode = request.data["mode"]
|
|
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
filter = {"pk": pk}
|
|
signupForm = get_object_or_404(queryset, **filter)
|
|
if mode == "all":
|
|
admin_send_email_signupees(signupForm.signups, subject, content)
|
|
return JsonResponse(status=201, data={"message": "Email sent"})
|
|
elif mode == "actual":
|
|
admin_send_email_signupees(
|
|
signupForm.signups[: signupForm.quota], subject, content
|
|
)
|
|
return JsonResponse(status=201, data={"message": "Email sent"})
|
|
elif mode == "reserved":
|
|
admin_send_email_signupees(
|
|
signupForm.signups[signupForm.quota :], subject, content
|
|
)
|
|
return JsonResponse(status=201, data={"message": "Email sent"})
|
|
else:
|
|
return JsonResponse(status=400, data={"error": f"Bad mode '{mode}'"})
|
|
|
|
@action(detail=True, methods=["get"], permission_classes=[IsAuthenticated])
|
|
def signups(self, request, pk=None, *args, **kwargs):
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
filter = {"pk": pk}
|
|
signupForm = get_object_or_404(queryset, **filter)
|
|
return Response(SignupSerializer(signupForm.signups, many=True).data)
|
|
|
|
|
|
class SignupViewSet(ModelViewSet):
|
|
queryset = Signup.objects.filter(deleted=False)
|
|
serializer_class = SignupSerializer
|
|
permission_classes = [SignupPermission]
|
|
|
|
@action(detail=True, methods=["get", "put"], permission_classes=[AllowAny])
|
|
def edit(self, request, pk=None, *args, **kwargs):
|
|
uuid = request.query_params.get("uuid", None)
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
filter = {"pk": pk, "uuid": uuid}
|
|
get_object_or_404(queryset, **filter)
|
|
if request.method == "GET":
|
|
return self.retrieve(request, *args, **kwargs)
|
|
elif request.method == "PUT":
|
|
return self.partial_update(request, *args, **kwargs)
|
|
|
|
def create(self, request, *args, **kwargs):
|
|
# Temporary manual duplicate check as submit_id uniqueness is not enforced in deployment database
|
|
if "submit_id" in request.data and Signup.objects.filter(
|
|
submit_id=request.data["submit_id"]
|
|
):
|
|
return JsonResponse(
|
|
status=200, data={"message": "The submission has already been received"}
|
|
)
|
|
|
|
id = request.data["signupForm_id"]
|
|
try:
|
|
answer = request.data["answer"]
|
|
form = SignupForm.objects.get(id=id)
|
|
if form.isOpen:
|
|
# Throws ValidationError if not valid
|
|
validate(instance=answer, schema=form.schema)
|
|
return super().create(request, *args, **kwargs)
|
|
except ValidationError as inst:
|
|
return JsonResponse(status=400, data={"error": inst.message})
|
|
except ObjectDoesNotExist:
|
|
return JsonResponse(
|
|
status=404, data={"error": f"SignupForm {id} not found"}
|
|
)
|
|
except IntegrityError:
|
|
return JsonResponse(
|
|
status=200, data={"message": "The submission has already been received"}
|
|
)
|
|
else:
|
|
return JsonResponse(
|
|
status=404, data={"error": f"SignupForm {id} not found"}
|
|
)
|
|
|
|
def partial_update(self, request, pk=None, *args, **kwargs):
|
|
try:
|
|
# ID & UUID validated in edit @action for normal users.
|
|
# This is otherwise open for authenticated users.
|
|
signup = self.get_object()
|
|
answer = request.data["answer"]
|
|
form = SignupForm.objects.get(id=signup.signupForm_id)
|
|
|
|
if form.visible:
|
|
# Throws ValidationError if not valid
|
|
validate(instance=answer, schema=form.schema)
|
|
return super().partial_update(request, *args, **kwargs)
|
|
except ValidationError as inst:
|
|
return JsonResponse(status=400, data={"error": inst.message})
|
|
except ObjectDoesNotExist:
|
|
return JsonResponse(
|
|
status=404, data={"error": f"SignupForm {id} not found"}
|
|
)
|
|
else:
|
|
return JsonResponse(
|
|
status=404, data={"error": f"SignupForm {id} not found"}
|
|
)
|
|
|
|
def destroy(self, request, pk=None, *args, **kwargs):
|
|
try:
|
|
signup = self.get_object()
|
|
signup.deleted = True
|
|
signup.save()
|
|
return JsonResponse(status=200, data={"message": "OK"})
|
|
except ObjectDoesNotExist:
|
|
return JsonResponse(status=404, data={"error": f"Signup {pk} not found"})
|
|
|
|
|
|
class SavedQuestionsViewSet(ModelViewSet):
|
|
queryset = TemplateQuestion.objects.filter(deleted=False)
|
|
serializer_class = SavedQuestionsSerializer
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
def destroy(self, request, pk=None, *args, **kwargs):
|
|
try:
|
|
question = self.get_object()
|
|
question.deleted = True
|
|
question.save()
|
|
return JsonResponse(status=200, data={"message": "OK"})
|
|
except ObjectDoesNotExist:
|
|
return JsonResponse(
|
|
status=404, data={"error": f"Template question {pk} not found"}
|
|
)
|
|
|
|
|
|
class FeedViewSet(ModelViewSet):
|
|
queryset = Feed.objects.filter(deleted=False)
|
|
serializer_class = FeedSerializer
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter)
|
|
filter_fields = ("id", "tags", "visible")
|
|
search_fields = ("id", "tags", "visible")
|
|
|
|
def get_queryset(self):
|
|
if self.request.user.is_authenticated:
|
|
return Feed.objects.filter(deleted=False).order_by("-publish_time")
|
|
else:
|
|
objs = Feed.objects.filter(deleted=False, visible=True).order_by(
|
|
"-publish_time"
|
|
)
|
|
|
|
# TODO: Bad filtering. Rewrite!
|
|
result_ids = []
|
|
for obj in objs:
|
|
if obj.autohide_enabled:
|
|
if obj.autohide > timezone.now():
|
|
result_ids.append(obj.id)
|
|
else:
|
|
result_ids.append(obj.id)
|
|
|
|
return Feed.objects.filter(id__in=result_ids).order_by("-publish_time")
|
|
|
|
def destroy(self, request, pk=None, *args, **kwargs):
|
|
try:
|
|
post = self.get_object()
|
|
post.deleted = True
|
|
post.save()
|
|
return JsonResponse(status=200, data={"message": "OK"})
|
|
except ObjectDoesNotExist:
|
|
return JsonResponse(status=404, data={"error": f"Post {pk} not found"})
|
|
|
|
|
|
class TagsViewSet(ReadOnlyModelViewSet):
|
|
queryset = Tag.objects.all()
|
|
serializer_class = TagSerializer
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
|
|
class JobAdViewSet(ModelViewSet):
|
|
queryset = JobAd.objects.filter(deleted=False)
|
|
serializer_class = JobAdSerializer
|
|
permission_classes = [IsAuthenticatedOrReadOnly]
|
|
|
|
def get_queryset(self):
|
|
if self.request.user.is_authenticated:
|
|
return JobAd.objects.filter(deleted=False)
|
|
return JobAd.objects.filter(
|
|
deleted=False, visible=True, autohide_at__gt=timezone.now()
|
|
)
|
|
|
|
def destroy(self, request, pk=None, *args, **kwargs):
|
|
try:
|
|
ad = self.get_object()
|
|
ad.deleted = True
|
|
ad.save()
|
|
return JsonResponse(status=200, data={"message": "OK"})
|
|
except ObjectDoesNotExist:
|
|
return JsonResponse(status=404, data={"error": f"Job Ad {pk} not found"})
|
|
|
|
|
|
@require_http_methods(["GET"])
|
|
def nginx_jwt_resp(request, *args, **kwargs):
|
|
accessKey = request.COOKIES.get("jwt_access", None)
|
|
if not accessKey:
|
|
return HttpResponse("", status=401)
|
|
try:
|
|
# This also verifies the signature.
|
|
# See https://pyjwt.readthedocs.io/en/latest/usage.html#reading-the-claimset-without-validation
|
|
token = decode(accessKey, settings.SECRET_KEY, algorithms=["HS256"])
|
|
except InvalidTokenError:
|
|
return HttpResponse("", status=403)
|
|
user = "admin" if token.get("username", "") == "admin" else "moderator"
|
|
resp = HttpResponse("", status=200)
|
|
resp["X-FBrowser-User"] = user
|
|
return resp
|