"""Webapp views.""" import json import time from jwt import decode from jwt.exceptions import InvalidTokenError from django.utils import timezone from django.conf import settings from django.http import HttpResponse, JsonResponse from django.shortcuts import get_object_or_404 from django.views.decorators.http import require_http_methods from django_filters import rest_framework as filters from django.db.models import Prefetch from django.core.exceptions import ObjectDoesNotExist from django.db.utils import IntegrityError from rest_framework import routers from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet from rest_framework.filters import OrderingFilter, SearchFilter from rest_framework.decorators import action from rest_framework.permissions import ( IsAuthenticatedOrReadOnly, BasePermission, AllowAny, IsAuthenticated, ) from jsonschema import validate from jsonschema.exceptions import ValidationError from webapp.models import * from webapp.serializers import * from webapp.utils import admin_send_email_signupees, decode_base64_file with open("./webapp/questionSchema.json", "r") as file: QUESTION_SCHEMA = json.load(file) class SignupPermission(BasePermission): def has_permission(self, request, view): if request.method == "POST": return True return request.user and request.user.is_authenticated # -- REST API -- # class RootView(routers.APIRootView): permission_classes = [IsAuthenticatedOrReadOnly] class EventViewSet(ModelViewSet): queryset = Event.objects.filter(deleted=False) ordering = ["start_time"] serializer_class = EventSerializer permission_classes = [IsAuthenticatedOrReadOnly] filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter) filter_fields = ("id", "tags", "visible", "signupForm") search_fields = ("id", "tags", "visible", "signupForm") def get_queryset(self): # TODO: For create and update, this return old data in signupForm field (prefetched)... if ( self.request.user.is_authenticated or self.request.method == "POST" or self.request.method == "PUT" ): return Event.objects.filter(deleted=False).prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False), to_attr="filtered_signup_forms", ) ) since = self.request.query_params.get("since", None) if since: return ( Event.objects.filter(deleted=False, visible=True, end_time__gt=since) .order_by("start_time") .prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False, visible=True), to_attr="filtered_signup_forms", ) ) ) return ( Event.objects.filter( deleted=False, visible=True, end_time__gt=timezone.now() ) .order_by("start_time") .prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False, visible=True), to_attr="filtered_signup_forms", ) ) ) def create(self, request, *args, **kwargs): raw_image = request.data.get("image", None) if raw_image is not None: image = decode_base64_file(raw_image) request.data.update({"image": image}) return super().create(request, *args, **kwargs) def update(self, request, *args, **kwargs): raw_image = request.data.get("image", None) if raw_image is not None: image = decode_base64_file(raw_image) request.data.update({"image": image}) return super().update(request, *args, **kwargs) def destroy(self, request, pk=None, *args, **kwargs): try: event = self.get_object() event.deleted = True event.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Event {pk} not found"}) class SignupFormViewSet(ModelViewSet): queryset = SignupForm.objects.filter(deleted=False) ordering = ["start_time"] serializer_class = SignupFormSerializer permission_classes = [IsAuthenticatedOrReadOnly] def create(self, request, *args, **kwargs): try: schema = QUESTION_SCHEMA validate(instance=request.data["questions"], schema=schema) return super().create(request, *args, **kwargs) except ValidationError as err: return JsonResponse(status=400, data={"error": err.message}) def update(self, request, *args, **kwargs): try: schema = QUESTION_SCHEMA validate(instance=request.data["questions"], schema=schema) return super().update(request, *args, **kwargs) except ValidationError as err: return JsonResponse(status=400, data={"error": err.message}) def get_queryset(self): if self.request.user.is_authenticated: return SignupForm.objects.filter(deleted=False).order_by("start_time") return SignupForm.objects.filter(deleted=False, visible=True).order_by( "start_time" ) def destroy(self, request, pk=None, *args, **kwargs): try: form = self.get_object() form.deleted = True form.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {pk} not found"} ) @action(detail=True, methods=["post"], permission_classes=[IsAuthenticated]) def sendemail(self, request, pk=None, *args, **kwargs): subject = request.data["subject"] content = request.data["content"] mode = request.data["mode"] queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk} signupForm = get_object_or_404(queryset, **filter) if mode == "all": admin_send_email_signupees(signupForm.signups, subject, content) return JsonResponse(status=201, data={"message": "Email sent"}) elif mode == "actual": admin_send_email_signupees( signupForm.signups[: signupForm.quota], subject, content ) return JsonResponse(status=201, data={"message": "Email sent"}) elif mode == "reserved": admin_send_email_signupees( signupForm.signups[signupForm.quota :], subject, content ) return JsonResponse(status=201, data={"message": "Email sent"}) else: return JsonResponse(status=400, data={"error": f"Bad mode '{mode}'"}) @action(detail=True, methods=["get"], permission_classes=[IsAuthenticated]) def signups(self, request, pk=None, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk} signupForm = get_object_or_404(queryset, **filter) return Response(SignupSerializer(signupForm.signups, many=True).data) class SignupViewSet(ModelViewSet): queryset = Signup.objects.filter(deleted=False) serializer_class = SignupSerializer permission_classes = [SignupPermission] @action(detail=True, methods=["get", "put"], permission_classes=[AllowAny]) def edit(self, request, pk=None, *args, **kwargs): uuid = request.query_params.get("uuid", None) queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk, "uuid": uuid} get_object_or_404(queryset, **filter) if request.method == "GET": return self.retrieve(request, *args, **kwargs) elif request.method == "PUT": return self.partial_update(request, *args, **kwargs) def create(self, request, *args, **kwargs): # Temporary manual duplicate check as submit_id uniqueness is not enforced in deployment database if "submit_id" in request.data and Signup.objects.filter( submit_id=request.data["submit_id"] ): return JsonResponse( status=200, data={"message": "The submission has already been received"} ) id = request.data["signupForm_id"] try: answer = request.data["answer"] form = SignupForm.objects.get(id=id) if form.isOpen: # Throws ValidationError if not valid validate(instance=answer, schema=form.schema) return super().create(request, *args, **kwargs) except ValidationError as inst: return JsonResponse(status=400, data={"error": inst.message}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) except IntegrityError: return JsonResponse( status=200, data={"message": "The submission has already been received"} ) else: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) def partial_update(self, request, pk=None, *args, **kwargs): try: # ID & UUID validated in edit @action for normal users. # This is otherwise open for authenticated users. signup = self.get_object() answer = request.data["answer"] form = SignupForm.objects.get(id=signup.signupForm_id) if form.visible: # Throws ValidationError if not valid validate(instance=answer, schema=form.schema) return super().partial_update(request, *args, **kwargs) except ValidationError as inst: return JsonResponse(status=400, data={"error": inst.message}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) else: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) def destroy(self, request, pk=None, *args, **kwargs): try: signup = self.get_object() signup.deleted = True signup.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Signup {pk} not found"}) class SavedQuestionsViewSet(ModelViewSet): queryset = TemplateQuestion.objects.filter(deleted=False) serializer_class = SavedQuestionsSerializer permission_classes = [IsAuthenticatedOrReadOnly] def destroy(self, request, pk=None, *args, **kwargs): try: question = self.get_object() question.deleted = True question.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"Template question {pk} not found"} ) class FeedViewSet(ModelViewSet): queryset = Feed.objects.filter(deleted=False) serializer_class = FeedSerializer permission_classes = [IsAuthenticatedOrReadOnly] filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter) filter_fields = ("id", "tags", "visible") search_fields = ("id", "tags", "visible") def get_queryset(self): if self.request.user.is_authenticated: return Feed.objects.filter(deleted=False).order_by("-publish_time") else: objs = Feed.objects.filter(deleted=False, visible=True).order_by( "-publish_time" ) # TODO: Bad filtering. Rewrite! result_ids = [] for obj in objs: if obj.autohide_enabled: if obj.autohide > timezone.now(): result_ids.append(obj.id) else: result_ids.append(obj.id) return Feed.objects.filter(id__in=result_ids).order_by("-publish_time") def destroy(self, request, pk=None, *args, **kwargs): try: post = self.get_object() post.deleted = True post.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Post {pk} not found"}) class TagsViewSet(ReadOnlyModelViewSet): queryset = Tag.objects.all() serializer_class = TagSerializer permission_classes = [IsAuthenticatedOrReadOnly] class JobAdViewSet(ModelViewSet): queryset = JobAd.objects.filter(deleted=False) serializer_class = JobAdSerializer permission_classes = [IsAuthenticatedOrReadOnly] def get_queryset(self): if self.request.user.is_authenticated: return JobAd.objects.filter(deleted=False) return JobAd.objects.filter( deleted=False, visible=True, autohide_at__gt=timezone.now() ) def destroy(self, request, pk=None, *args, **kwargs): try: ad = self.get_object() ad.deleted = True ad.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Job Ad {pk} not found"}) @require_http_methods(["GET"]) def nginx_jwt_resp(request, *args, **kwargs): accessKey = request.COOKIES.get("jwt_access", None) if not accessKey: return HttpResponse("", status=401) try: # This also verifies the signature. # See https://pyjwt.readthedocs.io/en/latest/usage.html#reading-the-claimset-without-validation token = decode(accessKey, settings.SECRET_KEY, algorithms=["HS256"]) except InvalidTokenError: return HttpResponse("", status=403) user = "admin" if token.get("username", "") == "admin" else "moderator" resp = HttpResponse("", status=200) resp["X-FBrowser-User"] = user return resp