Files
web2.0-backend/webapp/views.py
T
2025-09-16 21:24:52 +03:00

390 lines
15 KiB
Python

"""Webapp views."""
import json
from jwt import decode
from jwt.exceptions import InvalidTokenError
from django.utils import timezone
from django.conf import settings
from django.http import HttpResponse, JsonResponse
from django.shortcuts import get_object_or_404
from django.views.decorators.http import require_http_methods
from django_filters import rest_framework as filters
from django.db.models import Prefetch
from django.core.exceptions import ObjectDoesNotExist
from rest_framework import routers
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.decorators import action
from rest_framework.permissions import (
IsAuthenticatedOrReadOnly,
BasePermission,
AllowAny,
IsAuthenticated,
)
from jsonschema import validate
from jsonschema.exceptions import ValidationError
from webapp.models import *
from webapp.serializers import *
from webapp.utils import admin_send_email_signupees, decode_base64_file
with open("./webapp/questionSchema.json", "r") as file:
QUESTION_SCHEMA = json.load(file)
class SignupPermission(BasePermission):
def has_permission(self, request, view):
if request.method == "POST":
return True
return request.user and request.user.is_authenticated
# -- REST API -- #
class RootView(routers.APIRootView):
permission_classes = [IsAuthenticatedOrReadOnly]
class EventViewSet(ModelViewSet):
queryset = Event.objects.filter(deleted=False)
ordering = ["start_time"]
serializer_class = EventSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter)
filter_fields = ("id", "tags", "visible", "signupForm")
search_fields = ("id", "tags", "visible", "signupForm")
def get_queryset(self):
# TODO: For create and update, this return old data in signupForm field (prefetched)...
if (
self.request.user.is_authenticated
or self.request.method == "POST"
or self.request.method == "PUT"
):
return Event.objects.filter(deleted=False).prefetch_related(
Prefetch(
"signupForm",
queryset=SignupForm.objects.filter(deleted=False),
to_attr="filtered_signup_forms",
)
)
since = self.request.query_params.get("since", None)
if since:
return (
Event.objects.filter(deleted=False, visible=True, end_time__gt=since)
.order_by("start_time")
.prefetch_related(
Prefetch(
"signupForm",
queryset=SignupForm.objects.filter(deleted=False, visible=True),
to_attr="filtered_signup_forms",
)
)
)
return (
Event.objects.filter(
deleted=False, visible=True, end_time__gt=timezone.now()
)
.order_by("start_time")
.prefetch_related(
Prefetch(
"signupForm",
queryset=SignupForm.objects.filter(deleted=False, visible=True),
to_attr="filtered_signup_forms",
)
)
)
def create(self, request, *args, **kwargs):
raw_image = request.data.get("image", None)
if raw_image is not None:
image = decode_base64_file(raw_image)
request.data.update({"image": image})
return super().create(request, *args, **kwargs)
def update(self, request, *args, **kwargs):
raw_image = request.data.get("image", None)
if raw_image is not None:
image = decode_base64_file(raw_image)
request.data.update({"image": image})
return super().update(request, *args, **kwargs)
def destroy(self, request, pk=None, *args, **kwargs):
try:
event = self.get_object()
event.deleted = True
event.save()
return JsonResponse(status=200, data={"message": "OK"})
except ObjectDoesNotExist:
return JsonResponse(status=404, data={"error": f"Event {pk} not found"})
class SignupFormViewSet(ModelViewSet):
queryset = SignupForm.objects.filter(deleted=False)
ordering = ["start_time"]
serializer_class = SignupFormSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
def create(self, request, *args, **kwargs):
try:
schema = QUESTION_SCHEMA
validate(instance=request.data["questions"], schema=schema)
return super().create(request, *args, **kwargs)
except ValidationError as err:
return JsonResponse(status=400, data={"error": err.message})
def update(self, request, *args, **kwargs):
try:
schema = QUESTION_SCHEMA
validate(instance=request.data["questions"], schema=schema)
return super().update(request, *args, **kwargs)
except ValidationError as err:
return JsonResponse(status=400, data={"error": err.message})
def get_queryset(self):
if self.request.user.is_authenticated:
return SignupForm.objects.filter(deleted=False).order_by("start_time")
return SignupForm.objects.filter(deleted=False, visible=True).order_by(
"start_time"
)
def destroy(self, request, pk=None, *args, **kwargs):
try:
form = self.get_object()
form.deleted = True
form.save()
return JsonResponse(status=200, data={"message": "OK"})
except ObjectDoesNotExist:
return JsonResponse(
status=404, data={"error": f"SignupForm {pk} not found"}
)
@action(detail=True, methods=["post"], permission_classes=[IsAuthenticated])
def sendemail(self, request, pk=None, *args, **kwargs):
subject = request.data["subject"]
content = request.data["content"]
mode = request.data["mode"]
queryset = self.filter_queryset(self.get_queryset())
filter = {"pk": pk}
signupForm = get_object_or_404(queryset, **filter)
if mode == "all":
admin_send_email_signupees(signupForm.signups, subject, content)
return JsonResponse(status=201, data={"message": "Email sent"})
elif mode == "actual":
admin_send_email_signupees(
signupForm.signups[: signupForm.quota], subject, content
)
return JsonResponse(status=201, data={"message": "Email sent"})
elif mode == "reserved":
admin_send_email_signupees(
signupForm.signups[signupForm.quota :], subject, content
)
return JsonResponse(status=201, data={"message": "Email sent"})
else:
return JsonResponse(status=400, data={"error": f"Bad mode '{mode}'"})
@action(detail=True, methods=["get"], permission_classes=[IsAuthenticated])
def signups(self, request, pk=None, *args, **kwargs):
queryset = self.filter_queryset(self.get_queryset())
filter = {"pk": pk}
signupForm = get_object_or_404(queryset, **filter)
return Response(SignupSerializer(signupForm.signups, many=True).data)
class SignupViewSet(ModelViewSet):
queryset = Signup.objects.filter(deleted=False)
serializer_class = SignupSerializer
permission_classes = [SignupPermission]
submit_keys = (
{}
) # Dictionary for currently invalid submission keys; {key: timestamp}
def key_is_unique(self, submitKey):
current_time = time.time()
# Remove expired keys from dict (older than 1 h)
# A key that expires as the function is called is considered valid
SignupViewSet.submit_keys = {
key: time
for key, time in SignupViewSet.submit_keys.items()
if time + 3600 >= current_time
}
if submitKey not in SignupViewSet.submit_keys: # Key is unique; valid
SignupViewSet.submit_keys[submitKey] = current_time
return True
else: # Key is not unique; invalid, refresh timestamp
SignupViewSet.submit_keys[submitKey] = current_time
return False
@action(detail=True, methods=["get", "put"], permission_classes=[AllowAny])
def edit(self, request, pk=None, *args, **kwargs):
uuid = request.query_params.get("uuid", None)
queryset = self.filter_queryset(self.get_queryset())
filter = {"pk": pk, "uuid": uuid}
get_object_or_404(queryset, **filter)
if request.method == "GET":
return self.retrieve(request, *args, **kwargs)
elif request.method == "PUT":
return self.partial_update(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
id = request.data["signupForm_id"]
try:
submitKey = request.data.get("submitKey")
if submitKey is not None and not self.key_is_unique(submitKey):
return JsonResponse(
status=200, data={"message": "Ignored repeated request"}
)
answer = request.data["answer"]
form = SignupForm.objects.get(id=id)
if form.isOpen:
# Throws ValidationError if not valid
validate(instance=answer, schema=form.schema)
return super().create(request, *args, **kwargs)
except ValidationError as inst:
return JsonResponse(status=400, data={"error": inst.message})
except ObjectDoesNotExist:
return JsonResponse(
status=404, data={"error": f"SignupForm {id} not found"}
)
else:
return JsonResponse(
status=404, data={"error": f"SignupForm {id} not found"}
)
def partial_update(self, request, pk=None, *args, **kwargs):
try:
# ID & UUID validated in edit @action for normal users.
# This is otherwise open for authenticated users.
signup = self.get_object()
answer = request.data["answer"]
form = SignupForm.objects.get(id=signup.signupForm_id)
if form.visible:
# Throws ValidationError if not valid
validate(instance=answer, schema=form.schema)
return super().partial_update(request, *args, **kwargs)
except ValidationError as inst:
return JsonResponse(status=400, data={"error": inst.message})
except ObjectDoesNotExist:
return JsonResponse(
status=404, data={"error": f"SignupForm {id} not found"}
)
else:
return JsonResponse(
status=404, data={"error": f"SignupForm {id} not found"}
)
def destroy(self, request, pk=None, *args, **kwargs):
try:
signup = self.get_object()
signup.deleted = True
signup.save()
return JsonResponse(status=200, data={"message": "OK"})
except ObjectDoesNotExist:
return JsonResponse(status=404, data={"error": f"Signup {pk} not found"})
class SavedQuestionsViewSet(ModelViewSet):
queryset = TemplateQuestion.objects.filter(deleted=False)
serializer_class = SavedQuestionsSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
def destroy(self, request, pk=None, *args, **kwargs):
try:
question = self.get_object()
question.deleted = True
question.save()
return JsonResponse(status=200, data={"message": "OK"})
except ObjectDoesNotExist:
return JsonResponse(
status=404, data={"error": f"Template question {pk} not found"}
)
class FeedViewSet(ModelViewSet):
queryset = Feed.objects.filter(deleted=False)
serializer_class = FeedSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
filter_backends = (filters.DjangoFilterBackend, SearchFilter, OrderingFilter)
filter_fields = ("id", "tags", "visible")
search_fields = ("id", "tags", "visible")
def get_queryset(self):
if self.request.user.is_authenticated:
return Feed.objects.filter(deleted=False).order_by("-publish_time")
else:
objs = Feed.objects.filter(deleted=False, visible=True).order_by(
"-publish_time"
)
# TODO: Bad filtering. Rewrite!
result_ids = []
for obj in objs:
if obj.autohide_enabled:
if obj.autohide > timezone.now():
result_ids.append(obj.id)
else:
result_ids.append(obj.id)
return Feed.objects.filter(id__in=result_ids).order_by("-publish_time")
def destroy(self, request, pk=None, *args, **kwargs):
try:
post = self.get_object()
post.deleted = True
post.save()
return JsonResponse(status=200, data={"message": "OK"})
except ObjectDoesNotExist:
return JsonResponse(status=404, data={"error": f"Post {pk} not found"})
class TagsViewSet(ReadOnlyModelViewSet):
queryset = Tag.objects.all()
serializer_class = TagSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
class JobAdViewSet(ModelViewSet):
queryset = JobAd.objects.filter(deleted=False)
serializer_class = JobAdSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
def get_queryset(self):
if self.request.user.is_authenticated:
return JobAd.objects.filter(deleted=False)
return JobAd.objects.filter(
deleted=False, visible=True, autohide_at__gt=timezone.now()
)
def destroy(self, request, pk=None, *args, **kwargs):
try:
ad = self.get_object()
ad.deleted = True
ad.save()
return JsonResponse(status=200, data={"message": "OK"})
except ObjectDoesNotExist:
return JsonResponse(status=404, data={"error": f"Job Ad {pk} not found"})
@require_http_methods(["GET"])
def nginx_jwt_resp(request, *args, **kwargs):
accessKey = request.COOKIES.get("jwt_access", None)
if not accessKey:
return HttpResponse("", status=401)
try:
# This also verifies the signature.
# See https://pyjwt.readthedocs.io/en/latest/usage.html#reading-the-claimset-without-validation
token = decode(accessKey, settings.SECRET_KEY, algorithms=["HS256"])
except InvalidTokenError:
return HttpResponse("", status=403)
user = "admin" if token.get("username", "") == "admin" else "moderator"
resp = HttpResponse("", status=200)
resp["X-FBrowser-User"] = user
return resp