"""Webapp views.""" import json import time from jwt import decode from jwt.exceptions import InvalidTokenError from django.utils import timezone from django.conf import settings from django.http import HttpResponse, JsonResponse from django.shortcuts import get_object_or_404 from django.views.decorators.http import require_http_methods from django_filters import rest_framework as filters from django.db.models import Prefetch from django.core.exceptions import ObjectDoesNotExist from rest_framework import routers from rest_framework.response import Response from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet from rest_framework.filters import OrderingFilter, SearchFilter from rest_framework.decorators import action from rest_framework.permissions import ( IsAuthenticatedOrReadOnly, BasePermission, AllowAny, IsAuthenticated, ) from jsonschema import validate from jsonschema.exceptions import ValidationError from webapp.models import * from webapp.serializers import * from webapp.utils import admin_send_email_signupees, decode_base64_file with open("./webapp/questionSchema.json", "r") as file: QUESTION_SCHEMA = json.load(file) class SignupPermission(BasePermission): def has_permission(self, request, view): if request.method == "POST": return True return request.user and request.user.is_authenticated # -- REST API -- # class RootView(routers.APIRootView): permission_classes = [IsAuthenticatedOrReadOnly] class EventViewSet(ModelViewSet): queryset = Event.objects.filter(deleted=False) ordering = ["start_time"] serializer_class = EventSerializer permission_classes = [IsAuthenticatedOrReadOnly] filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter) filter_fields = ("id", "tags", "visible", "signupForm") search_fields = ("id", "tags", "visible", "signupForm") def get_queryset(self): # TODO: For create and update, this return old data in signupForm field (prefetched)... if ( self.request.user.is_authenticated or self.request.method == "POST" or self.request.method == "PUT" ): return Event.objects.filter(deleted=False).prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False), to_attr="filtered_signup_forms", ) ) since = self.request.query_params.get("since", None) if since: return ( Event.objects.filter(deleted=False, visible=True, end_time__gt=since) .order_by("start_time") .prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False, visible=True), to_attr="filtered_signup_forms", ) ) ) return ( Event.objects.filter( deleted=False, visible=True, end_time__gt=timezone.now() ) .order_by("start_time") .prefetch_related( Prefetch( "signupForm", queryset=SignupForm.objects.filter(deleted=False, visible=True), to_attr="filtered_signup_forms", ) ) ) def create(self, request, *args, **kwargs): raw_image = request.data.get("image", None) if raw_image is not None: image = decode_base64_file(raw_image) request.data.update({"image": image}) return super().create(request, *args, **kwargs) def update(self, request, *args, **kwargs): raw_image = request.data.get("image", None) if raw_image is not None: image = decode_base64_file(raw_image) request.data.update({"image": image}) return super().update(request, *args, **kwargs) def destroy(self, request, pk=None, *args, **kwargs): try: event = self.get_object() event.deleted = True event.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Event {pk} not found"}) class SignupFormViewSet(ModelViewSet): queryset = SignupForm.objects.filter(deleted=False) ordering = ["start_time"] serializer_class = SignupFormSerializer permission_classes = [IsAuthenticatedOrReadOnly] def create(self, request, *args, **kwargs): try: schema = QUESTION_SCHEMA validate(instance=request.data["questions"], schema=schema) return super().create(request, *args, **kwargs) except ValidationError as err: return JsonResponse(status=400, data={"error": err.message}) def update(self, request, *args, **kwargs): try: schema = QUESTION_SCHEMA validate(instance=request.data["questions"], schema=schema) return super().update(request, *args, **kwargs) except ValidationError as err: return JsonResponse(status=400, data={"error": err.message}) def get_queryset(self): if self.request.user.is_authenticated: return SignupForm.objects.filter(deleted=False).order_by("start_time") return SignupForm.objects.filter(deleted=False, visible=True).order_by( "start_time" ) def destroy(self, request, pk=None, *args, **kwargs): try: form = self.get_object() form.deleted = True form.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {pk} not found"} ) @action(detail=True, methods=["post"], permission_classes=[IsAuthenticated]) def sendemail(self, request, pk=None, *args, **kwargs): subject = request.data["subject"] content = request.data["content"] mode = request.data["mode"] queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk} signupForm = get_object_or_404(queryset, **filter) if mode == "all": admin_send_email_signupees(signupForm.signups, subject, content) return JsonResponse(status=201, data={"message": "Email sent"}) elif mode == "actual": admin_send_email_signupees( signupForm.signups[: signupForm.quota], subject, content ) return JsonResponse(status=201, data={"message": "Email sent"}) elif mode == "reserved": admin_send_email_signupees( signupForm.signups[signupForm.quota :], subject, content ) return JsonResponse(status=201, data={"message": "Email sent"}) else: return JsonResponse(status=400, data={"error": f"Bad mode '{mode}'"}) @action(detail=True, methods=["get"], permission_classes=[IsAuthenticated]) def signups(self, request, pk=None, *args, **kwargs): queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk} signupForm = get_object_or_404(queryset, **filter) return Response(SignupSerializer(signupForm.signups, many=True).data) class SignupViewSet(ModelViewSet): queryset = Signup.objects.filter(deleted=False) serializer_class = SignupSerializer permission_classes = [SignupPermission] submit_keys = ( {} ) # Dictionary for currently invalid submission keys; {key: timestamp} def key_is_unique(self, submitKey): current_time = time.time() printf("Checking", key) # Remove expired keys from dict (older than 1 h) # A key that expires as the function is called is considered valid printf("Keys", submit_keys) SignupViewSet.submit_keys = { key: time for key, time in SignupViewSet.submit_keys.items() if time + 3600 >= current_time } printf("After", submit_keys) if submitKey not in SignupViewSet.submit_keys: # Key is unique; valid SignupViewSet.submit_keys[submitKey] = current_time return True else: # Key is not unique; invalid, refresh timestamp SignupViewSet.submit_keys[submitKey] = current_time return False @action(detail=True, methods=["get", "put"], permission_classes=[AllowAny]) def edit(self, request, pk=None, *args, **kwargs): uuid = request.query_params.get("uuid", None) queryset = self.filter_queryset(self.get_queryset()) filter = {"pk": pk, "uuid": uuid} get_object_or_404(queryset, **filter) if request.method == "GET": return self.retrieve(request, *args, **kwargs) elif request.method == "PUT": return self.partial_update(request, *args, **kwargs) def create(self, request, *args, **kwargs): id = request.data["signupForm_id"] try: submitKey = request.data.get("submitKey") if submitKey is not None and not self.key_is_unique(submitKey): return JsonResponse( status=200, data={"message": "Ignored repeated request"} ) answer = request.data["answer"] form = SignupForm.objects.get(id=id) if form.isOpen: # Throws ValidationError if not valid validate(instance=answer, schema=form.schema) return super().create(request, *args, **kwargs) except ValidationError as inst: return JsonResponse(status=400, data={"error": inst.message}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) else: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) def partial_update(self, request, pk=None, *args, **kwargs): try: # ID & UUID validated in edit @action for normal users. # This is otherwise open for authenticated users. signup = self.get_object() answer = request.data["answer"] form = SignupForm.objects.get(id=signup.signupForm_id) if form.visible: # Throws ValidationError if not valid validate(instance=answer, schema=form.schema) return super().partial_update(request, *args, **kwargs) except ValidationError as inst: return JsonResponse(status=400, data={"error": inst.message}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) else: return JsonResponse( status=404, data={"error": f"SignupForm {id} not found"} ) def destroy(self, request, pk=None, *args, **kwargs): try: signup = self.get_object() signup.deleted = True signup.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Signup {pk} not found"}) class SavedQuestionsViewSet(ModelViewSet): queryset = TemplateQuestion.objects.filter(deleted=False) serializer_class = SavedQuestionsSerializer permission_classes = [IsAuthenticatedOrReadOnly] def destroy(self, request, pk=None, *args, **kwargs): try: question = self.get_object() question.deleted = True question.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse( status=404, data={"error": f"Template question {pk} not found"} ) class FeedViewSet(ModelViewSet): queryset = Feed.objects.filter(deleted=False) serializer_class = FeedSerializer permission_classes = [IsAuthenticatedOrReadOnly] filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter) filter_fields = ("id", "tags", "visible") search_fields = ("id", "tags", "visible") def get_queryset(self): if self.request.user.is_authenticated: return Feed.objects.filter(deleted=False).order_by("-publish_time") else: objs = Feed.objects.filter(deleted=False, visible=True).order_by( "-publish_time" ) # TODO: Bad filtering. Rewrite! result_ids = [] for obj in objs: if obj.autohide_enabled: if obj.autohide > timezone.now(): result_ids.append(obj.id) else: result_ids.append(obj.id) return Feed.objects.filter(id__in=result_ids).order_by("-publish_time") def destroy(self, request, pk=None, *args, **kwargs): try: post = self.get_object() post.deleted = True post.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Post {pk} not found"}) class TagsViewSet(ReadOnlyModelViewSet): queryset = Tag.objects.all() serializer_class = TagSerializer permission_classes = [IsAuthenticatedOrReadOnly] class JobAdViewSet(ModelViewSet): queryset = JobAd.objects.filter(deleted=False) serializer_class = JobAdSerializer permission_classes = [IsAuthenticatedOrReadOnly] def get_queryset(self): if self.request.user.is_authenticated: return JobAd.objects.filter(deleted=False) return JobAd.objects.filter( deleted=False, visible=True, autohide_at__gt=timezone.now() ) def destroy(self, request, pk=None, *args, **kwargs): try: ad = self.get_object() ad.deleted = True ad.save() return JsonResponse(status=200, data={"message": "OK"}) except ObjectDoesNotExist: return JsonResponse(status=404, data={"error": f"Job Ad {pk} not found"}) @require_http_methods(["GET"]) def nginx_jwt_resp(request, *args, **kwargs): accessKey = request.COOKIES.get("jwt_access", None) if not accessKey: return HttpResponse("", status=401) try: # This also verifies the signature. # See https://pyjwt.readthedocs.io/en/latest/usage.html#reading-the-claimset-without-validation token = decode(accessKey, settings.SECRET_KEY, algorithms=["HS256"]) except InvalidTokenError: return HttpResponse("", status=403) user = "admin" if token.get("username", "") == "admin" else "moderator" resp = HttpResponse("", status=200) resp["X-FBrowser-User"] = user return resp