175 lines
5.2 KiB
Python
175 lines
5.2 KiB
Python
"""Webapp utils."""
|
|
|
|
import logging
|
|
import base64
|
|
import uuid
|
|
import imghdr
|
|
import markdown
|
|
import sendgrid
|
|
from django.utils import timezone
|
|
from sendgrid.helpers.mail import (
|
|
Email,
|
|
To,
|
|
Subject,
|
|
Mail,
|
|
HtmlContent,
|
|
PlainTextContent,
|
|
)
|
|
from django.template.loader import render_to_string
|
|
from django.core.files.base import ContentFile
|
|
from sikweb.settings import (
|
|
DEPLOY_ENV,
|
|
FRONTEND_URL,
|
|
EMAIL_API_KEY,
|
|
DEFAULT_EMAIL_FROM,
|
|
DEFAULT_EMAIL_FROM_ADDR,
|
|
ENABLE_AUTOMATIC_EMAILS,
|
|
GROUP_KEY,
|
|
GOOGLE_CREDS,
|
|
)
|
|
from datetime import timedelta
|
|
|
|
from google.oauth2 import service_account
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.errors import HttpError
|
|
|
|
|
|
def get_file_extension(file_name, decoded_file):
|
|
extension = imghdr.what(file_name, decoded_file)
|
|
extension = "jpg" if extension == "jpeg" else extension
|
|
return extension
|
|
|
|
|
|
def decode_base64_file(data):
|
|
# Check if this is a base64 string
|
|
if isinstance(data, str):
|
|
# Check if the base64 string is in the "data:" format
|
|
if "data:" in data and ";base64," in data:
|
|
# Break out the header from the base64 content
|
|
header, data = data.split(";base64,")
|
|
|
|
# Try to decode the file. Return validation error if it fails.
|
|
try:
|
|
decoded_file = base64.b64decode(data)
|
|
except TypeError:
|
|
TypeError("invalid_image")
|
|
|
|
# Generate file name:
|
|
file_name = str(uuid.uuid4())
|
|
|
|
# Get the file name extension:
|
|
file_extension = get_file_extension(file_name, decoded_file)
|
|
|
|
complete_file_name = "%s.%s" % (
|
|
file_name,
|
|
file_extension,
|
|
)
|
|
|
|
return ContentFile(decoded_file, name=complete_file_name)
|
|
|
|
|
|
def month_from_now():
|
|
"""Return date one month from now."""
|
|
return timezone.now() + timedelta(days=30)
|
|
|
|
|
|
def send_email(to: str, subject: str, body: str, html: bool = False):
|
|
if ENABLE_AUTOMATIC_EMAILS is False:
|
|
logging.debug("Skipping email")
|
|
logging.debug(f"to: {to}")
|
|
logging.debug(f"subject: {subject}")
|
|
logging.debug(f"body: {body}")
|
|
return
|
|
|
|
from_email = Email(email=DEFAULT_EMAIL_FROM_ADDR, name=DEFAULT_EMAIL_FROM)
|
|
to_email = To(email=to)
|
|
sub = Subject(subject=subject)
|
|
|
|
html_content = None
|
|
plain_text_content = None
|
|
if html:
|
|
html_content = HtmlContent(content=body)
|
|
else:
|
|
plain_text_content = PlainTextContent(content=body)
|
|
|
|
mail = Mail(
|
|
from_email=from_email,
|
|
to_emails=to_email,
|
|
subject=sub,
|
|
html_content=html_content,
|
|
plain_text_content=plain_text_content,
|
|
)
|
|
|
|
try:
|
|
sg = sendgrid.SendGridAPIClient(api_key=EMAIL_API_KEY)
|
|
response = sg.send(mail)
|
|
|
|
if response.status_code != 202:
|
|
raise Exception(f"Failed to send email: {response.body}")
|
|
|
|
except Exception as ex:
|
|
logging.exception("Failed to send email.")
|
|
|
|
|
|
def send_signup_email(to, subject, id, uuid, content):
|
|
message = render_to_string(
|
|
"webapp/signup_email.html",
|
|
{
|
|
"url": f"https://{FRONTEND_URL}/signup/edit/{id}/{uuid}",
|
|
"content": markdown.markdown(content),
|
|
},
|
|
)
|
|
|
|
return send_email(to, subject, message, True)
|
|
|
|
|
|
def admin_send_email_signupees(list, subject, content):
|
|
for to in list:
|
|
send_email(to.email, subject, markdown.markdown(content), True)
|
|
|
|
|
|
def add_to_mailinglist(email: str):
|
|
try:
|
|
# get data
|
|
SCOPES = ["https://www.googleapis.com/auth/admin.directory.group"]
|
|
|
|
# create credentials, with subject is used to impersonate admin account
|
|
# jas_manager has groups editor rights in google admin
|
|
credentials = service_account.Credentials.from_service_account_info(
|
|
info=GOOGLE_CREDS, scopes=SCOPES
|
|
).with_subject("jas_manager@sahkoinsinoorikilta.fi")
|
|
|
|
service = build("admin", "directory_v1", credentials=credentials)
|
|
service.members().insert(groupKey=GROUP_KEY, body={"email": email}).execute()
|
|
except HttpError as err:
|
|
# Already in list, do nothing
|
|
print()
|
|
print(err)
|
|
print()
|
|
|
|
if err.status_code == 409:
|
|
pass
|
|
else:
|
|
logging.exception("Failed adding user to list")
|
|
|
|
# Send email notificcation to maintainer, only in prod
|
|
if DEPLOY_ENV == "production":
|
|
to = "ilari.ojakorpi@sahkoinsinoorikilta.fi"
|
|
subject = "Web error: Failed adding to google groups"
|
|
body = "Error code: {}\nError details: {}\nEmail that was not added: {}\n\nAdd user manually to jäsenet groups.".format(
|
|
err.status_code, err.error_details, email
|
|
)
|
|
|
|
send_email(to, subject, body)
|
|
""" except ValueError as err:
|
|
logging.exception("Formatting of google credentials is incorrect")
|
|
|
|
if DEPLOY_ENV == "production":
|
|
to = "ilari.ojakorpi@sahkoinsinoorikilta.fi"
|
|
subject = "Web error: Failed adding to google groups"
|
|
body = "Google credential formatted incorretly\nEmail that was not added: {}\n\nAdd user manually to jäsenet groups.".format(
|
|
email
|
|
)
|
|
|
|
send_email(to, subject, body) """
|