"""Webapp views.""" import json from jwt import decode from jwt.exceptions import InvalidTokenError from django.utils import timezone from django.conf import settings from django.http import HttpResponse, JsonResponse from django.shortcuts import get_object_or_404 from django.views.decorators.http import require_http_methods from django_filters import rest_framework as filters from django.db.models import Prefetch from django.core.exceptions import ObjectDoesNotExist from rest_framework import routers from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet from rest_framework.filters import OrderingFilter, SearchFilter from rest_framework.decorators import action from rest_framework.permissions import ( IsAuthenticatedOrReadOnly, BasePermission, AllowAny, IsAuthenticated, ) from jsonschema import validate from jsonschema.exceptions import ValidationError from webapp.models import * from webapp.serializers import * from webapp.utils import admin_send_email_signupees, decode_base64_file with open("./webapp/questionSchema.json", "r") as file: QUESTION_SCHEMA = json.load(file) class SignupPermission(BasePermission): def has_permission(self, request, view): if request.method == "POST": return True return request.user and request.user.is_authenticated # -- REST API -- # class RootView(routers.APIRootView): permission_classes = [IsAuthenticatedOrReadOnly] class EventViewSet(ModelViewSet): queryset = Event.objects.filter(deleted=False) ordering = ["start_time"] serializer_class = EventSerializer permission_classes = [IsAuthenticatedOrReadOnly] filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter) filter_fields = ("id", "tags", "visible", "signupForm") search_fields = ("id", "tags", "visible", "signupForm") def get_queryset(self): # TODO: For create and update, this return old data in signupForm field (prefetched)... if ( self.request.user.is_authenticated or self.request.method == "POST" or self.request.method == "PUT" ): return Event.objects.filter(deleted=False).prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False), to_attr="filtered_signup_forms", ) ) since = self.request.query_params.get("since", None) if since: return ( Event.objects.filter(deleted=False, visible=True, end_time__gt=since) .order_by("start_time") .prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False, visible=True), to_attr="filtered_signup_forms", ) ) ) return ( Event.objects.filter( deleted=False, visible=True, end_time__gt=timezone.now() ) .order_by("start_time") .prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False, visible=True), to_attr="filtered_signup_forms", ) ) ) def create(self, request, *args, **kwargs): raw_image = request.data.get("image", None) if raw_image is not None: image = decode_base64_file(raw_image) request.data.update({"image": image}) return super().create(request, *args, **kwargs) def update(self, request, *args, **kwargs): raw_image = request.data.get("image", None) if raw_image is not None: image = decode_base64_file(raw_image) request.data.update({"image": image}) return super().update(request, *args, **kwargs) def destroy(self, request, pk=None, *args, **kwargs): try: event = self.get_object() event.deleted = True event.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Event {pk} not found"}) class SignupFormViewSet(ModelViewSet): queryset = SignupForm.objects.filter(deleted=False) ordering = ["start_time"] serializer_class = SignupFormSerializer permission_classes = [IsAuthenticatedOrReadOnly] def create(self, request, *args, **kwargs): try: schema = QUESTION_SCHEMA validate(instance=request.data["questions"], schema=schema) return super().create(request, *args, **kwargs) except ValidationError as err: return JsonResponse(status=400, data={"error": err.message}) def update(self, request, *args, **kwargs): try: schema = QUESTION_SCHEMA validate(instance=request.data["questions"], schema=schema) return super().update(request, *args, **kwargs) except ValidationError as err: return JsonResponse(status=400, data={"error": err.message}) def get_queryset(self): if self.request.user.is_authenticated: return SignupForm.objects.filter(deleted=False).order_by("start_time") return SignupForm.objects.filter(deleted=False, visible=True).order_by( "start_time" ) def destroy(self, request, pk=None, *args, **kwargs): try: form = self.get_object() form.deleted = True form.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {pk} not found"} ) @action(detail=True, methods=["post"], permission_classes=[IsAuthenticated]) def sendemail(self, request, pk=None, *args, **kwargs): subject = request.data["subject"] content = request.data["content"] mode = request.data["mode"] queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk} signupForm = get_object_or_404(queryset, **filter) if mode == "all": admin_send_email_signupees(signupForm.signups, subject, content) return JsonResponse(status=201, data={"message": "Email sent"}) elif mode == "actual": admin_send_email_signupees( signupForm.signups[: signupForm.quota], subject, content ) return JsonResponse(status=201, data={"message": "Email sent"}) elif mode == "reserved": admin_send_email_signupees( signupForm.signups[signupForm.quota :], subject, content ) return JsonResponse(status=201, data={"message": "Email sent"}) else: return JsonResponse(status=400, data={"error": f"Bad mode '{mode}'"}) @action(detail=True, methods=["get"], permission_classes=[IsAuthenticated]) def signups(self, request, pk=None, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk} signupForm = get_object_or_404(queryset, **filter) return Response(SignupSerializer(signupForm.signups, many=True).data) class SignupViewSet(ModelViewSet): queryset = Signup.objects.filter(deleted=False) serializer_class = SignupSerializer permission_classes = [SignupPermission] @action(detail=True, methods=["get", "put"], permission_classes=[AllowAny]) def edit(self, request, pk=None, *args, **kwargs): uuid = request.query_params.get("uuid", None) queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk, "uuid": uuid} get_object_or_404(queryset, **filter) if request.method == "GET": return self.retrieve(request, *args, **kwargs) elif request.method == "PUT": return self.partial_update(request, *args, **kwargs) def create(self, request, *args, **kwargs): id = request.data["signupForm_id"] try: answer = request.data["answer"] form = SignupForm.objects.get(id=id) if form.isOpen: # Throws ValidationError if not valid validate(instance=answer, schema=form.schema) return super().create(request, *args, **kwargs) except ValidationError as inst: return JsonResponse(status=400, data={"error": inst.message}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) else: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) def partial_update(self, request, pk=None, *args, **kwargs): try: # ID & UUID validated in edit @action for normal users. # This is otherwise open for authenticated users. signup = self.get_object() answer = request.data["answer"] form = SignupForm.objects.get(id=signup.signupForm_id) if form.visible: # Throws ValidationError if not valid validate(instance=answer, schema=form.schema) return super().partial_update(request, *args, **kwargs) except ValidationError as inst: return JsonResponse(status=400, data={"error": inst.message}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) else: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) def destroy(self, request, pk=None, *args, **kwargs): try: signup = self.get_object() signup.deleted = True signup.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Signup {pk} not found"}) class SavedQuestionsViewSet(ModelViewSet): queryset = TemplateQuestion.objects.filter(deleted=False) serializer_class = SavedQuestionsSerializer permission_classes = [IsAuthenticatedOrReadOnly] def destroy(self, request, pk=None, *args, **kwargs): try: question = self.get_object() question.deleted = True question.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"Template question {pk} not found"} ) class FeedViewSet(ModelViewSet): queryset = Feed.objects.filter(deleted=False) serializer_class = FeedSerializer permission_classes = [IsAuthenticatedOrReadOnly] filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter) filter_fields = ("id", "tags", "visible") search_fields = ("id", "tags", "visible") def get_queryset(self): if self.request.user.is_authenticated: return Feed.objects.filter(deleted=False).order_by("-publish_time") else: objs = Feed.objects.filter(deleted=False, visible=True).order_by( "-publish_time" ) # TODO: Bad filtering. Rewrite! result_ids = [] for obj in objs: if obj.autohide_enabled: if obj.autohide > timezone.now(): result_ids.append(obj.id) else: result_ids.append(obj.id) return Feed.objects.filter(id__in=result_ids).order_by("-publish_time") def destroy(self, request, pk=None, *args, **kwargs): try: post = self.get_object() post.deleted = True post.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Post {pk} not found"}) class TagsViewSet(ReadOnlyModelViewSet): queryset = Tag.objects.all() serializer_class = TagSerializer permission_classes = [IsAuthenticatedOrReadOnly] class JobAdViewSet(ModelViewSet): queryset = JobAd.objects.filter(deleted=False) serializer_class = JobAdSerializer permission_classes = [IsAuthenticatedOrReadOnly] def get_queryset(self): if self.request.user.is_authenticated: return JobAd.objects.filter(deleted=False) return JobAd.objects.filter( deleted=False, visible=True, autohide_at__gt=timezone.now() ) def destroy(self, request, pk=None, *args, **kwargs): try: ad = self.get_object() ad.deleted = True ad.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Job Ad {pk} not found"}) @require_http_methods(["GET"]) def nginx_jwt_resp(request, *args, **kwargs): accessKey = request.COOKIES.get("jwt_access", None) if not accessKey: return HttpResponse("", status=401) try: # This also verifies the signature. # See https://pyjwt.readthedocs.io/en/latest/usage.html#reading-the-claimset-without-validation token = decode(accessKey, settings.SECRET_KEY, algorithms=["HS256"]) except InvalidTokenError: return HttpResponse("", status=403) user = "admin" if token.get("username", "") == "admin" else "moderator" resp = HttpResponse("", status=200) resp["X-FBrowser-User"] = user return resp