Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • startuplab/courses/tjts5901-continuous-software-engineering/TJTS5901-K23_template
  • planet-of-the-apes/tjts-5901-apeuction
  • uunot-yliopiston-leivissa/tjts-5901-uunot
  • contain-the-cry/tjts-5901-auction-system
  • avengers/avengers
  • cse6/cse-6
  • 13th/13-sins-of-gitlab
  • fast-and-furious/fast-and-furious
  • back-to-the-future/delorean-auction
  • monty-pythons-the-meaning-of-life/the-meaning-of-life
  • team-atlantis/the-empire
  • code-with-the-wind/auction-project
  • the-pirates/the-pirates
  • do-the-right-thing/do-the-right-thing
  • inception/inception
  • the-social-network-syndicate/the-social-auction-network
  • team-the-hunt-for-red-october/tjts-5901-k-23-red-october
  • good-on-paper/good-paper-project
  • desperados/desperados
19 results
Show changes
Commits on Source (133)
Showing with 1714 additions and 64 deletions
......@@ -165,3 +165,6 @@ cython_debug/
_docs/
.env
.DS_Store
# Ignore certificates
azure-sp.pem
......@@ -2,6 +2,8 @@
stages:
- build
- test
- staging
- smoketest
- deploy
variables:
......@@ -13,6 +15,8 @@ variables:
## (Optional) More verbose output from pipeline. Enabling it might reveal secrets.
#CI_DEBUG_TRACE: "true"
include:
- template: Jobs/SAST.gitlab-ci.yml
## Use buildkit to build the container.
## Buildkit: https://github.com/moby/buildkit
......@@ -45,6 +49,15 @@ build:
--opt build-arg:CI_COMMIT_SHA=${CI_COMMIT_SHA} \
--output type=image,name=${DOCKER_IMAGE_NAME}:${DOCKER_TAG},push=true
sast:
## Static Application Security Test
## You can override the included template(s) by including variable overrides
## SAST customization:
## https://docs.gitlab.com/ee/user/application_security/sast/#customizing-the-sast-settings
stage: test
## Run the tests. If any of the tests fails, pipeline is rejected.
test:
## Optional: include stage and environment name
......@@ -60,14 +73,104 @@ test:
- name: mongo:4.2 # update to reflect same version used on production
alias: mongo
script:
- pip install --disable-pip-version-check -e .[test]
- pytest -v
- echo "Test run succesfully!"
- pip install --disable-pip-version-check -e /app[test]
## Run tests with coverage reporting
- coverage run -m pytest
## Run basic reporting for badge
- coverage report
## Generate detailed report for gitlab annotations.
- coverage xml -o coverage.xml
coverage: '/(?i)total.*? (100(?:\.0+)?\%|[1-9]?\d(?:\.\d+)?\%)$/'
artifacts:
reports:
coverage_report:
coverage_format: cobertura
path: coverage.xml
## Job to setup new staging job.
deploy to staging:
stage: staging
## Only run this stage when main branch receives changes.
only:
- main
## Use microsoft provided azure cli image, that contains az cli.
image: mcr.microsoft.com/azure-cli
## Setup the environment variables. The can be accessed through the gitlab
## Deployments -> Environments. Generates url based on the app and branch name.
environment:
name: $CI_JOB_STAGE
url: https://${AZURE_APP_NAME}-${CI_ENVIRONMENT_SLUG}.azurewebsites.net
before_script:
## Make sanity check that gitlab variables stage is done.
- test -z "${AZURE_SP_NAME}" && (echo "Missing required variable AZURE_SP_NAME. See 'Staging.md'"; exit 1)
- test -f "${AZURE_SP_CERT}" || ( echo "AZURE_SP_CERT (${AZURE_SP_CERT}) file is missing!"; exit 1)
- test -z "${AZURE_APP_NAME}" && (echo "Missing required variable AZURE_APP_NAME. See 'Staging.md'"; exit 1)
- test -z "${AZURE_RESOURCE_GROUP}" && (echo "Missing required variable DOCKER_AUTH_CONFIG. See 'Staging.md'"; exit 1)
## Login into azure
- az login --service-principal -u "${AZURE_SP_NAME}" -p "${AZURE_SP_CERT}" --tenant "jyu.onmicrosoft.com"
script:
## Create staging slot and copy settings from production
- |
az webapp deployment slot create -n "$AZURE_APP_NAME" -g "$AZURE_RESOURCE_GROUP" \
--slot "$CI_ENVIRONMENT_SLUG" --configuration-source "$AZURE_APP_NAME"
## TODO: Create a snapshot of database, and use it.
## If you need to change settings see: https://docs.microsoft.com/en-us/cli/azure/webapp/config/appsettings
## Change container tag to reflect branch we're running on
- |
az webapp config container set -n "$AZURE_APP_NAME" -g "$AZURE_RESOURCE_GROUP" \
--docker-custom-image-name "${DOCKER_IMAGE_NAME}:${DOCKER_TAG}" -s "$CI_ENVIRONMENT_SLUG"
## In case slot already existed, restart the slot
- az webapp restart -n "$AZURE_APP_NAME" -g "$AZURE_RESOURCE_GROUP" -s "$CI_ENVIRONMENT_SLUG"
## Restart is not immediate, it takes a sec or two, depending on container changes.
- sleep 20
## Deploy latest image to the production
deploy:
## Store server info as artifact for prosperity
- curl "$CI_ENVIRONMENT_URL/server-info" -o server-info.json
artifacts:
paths:
- server-info.json
## Run smoketest to check that staging app is responding as expected.
staging smoketest:
stage: smoketest
image: ${DOCKER_IMAGE_NAME}:${DOCKER_TAG}
environment:
name: staging
url: https://${AZURE_APP_NAME}-${CI_ENVIRONMENT_SLUG}.azurewebsites.net
only:
- main
script:
- pip install --disable-pip-version-check -e .[test]
## Environment url can be inherited from CI_ENVIROMNENT_URL, which is one defined
## in the `environment.url:`. Using it here explicitly.
- echo "Testing againsta deployment address ${CI_ENVIROMNENT_URL}"
- pytest -v --environment-url="${CI_ENVIROMNENT_URL}" tests/test_smoke.py
## Push latest image into registry with the `latest` tag.
docker tag latest:
stage: deploy
environment: production
environment:
name: production
image: docker:20.10.23
only:
- main
......@@ -76,3 +179,26 @@ deploy:
- mkdir -p ${HOME}/.docker && echo "${DOCKER_AUTH_CONFIG}" > "${HOME}/.docker/config.json"
## Add the `latest` tag to the image we have build.
- docker buildx imagetools create ${DOCKER_IMAGE_NAME}:${DOCKER_TAG} --tag ${DOCKER_IMAGE_NAME}:latest
## Swap the production and staging slots around.
staging to production:
stage: deploy
## Only run this stage when main branch receives changes.
only:
- main
## Use microsoft provided azure cli image, that contains az cli.
image: mcr.microsoft.com/azure-cli
environment:
name: production
url: https://${AZURE_APP_NAME}.azurewebsites.net/
before_script:
## Login into azure
- az login --service-principal -u "${AZURE_SP_NAME}" -p "${AZURE_SP_CERT}" --tenant "jyu.onmicrosoft.com"
script:
## Swap production and staging slots.
- az webapp deployment slot swap -g "$AZURE_RESOURCE_GROUP" -n "$AZURE_APP_NAME" -s staging --target-slot production
## Summary
Briefly describe the issue and its impact on the project concisely.
## Steps to Reproduce
1. List the steps to reproduce the issue
2. Provide any relevant details such as browser, device, version, etc.
## Expected Outcome
What should happen after following the steps?
## Actual Outcome
What actually happens after following the steps?
## Additional Context
- Include any relevant screenshots, logs, or code snippets
- Indicate if the issue occurs only in certain conditions, environments, or browsers
## Possible fixes
If you are familiar with the issue and in a position to help, it would be appreciated!
/label ~bug
......@@ -21,7 +21,7 @@ WORKDIR /app
## Declare default flask app as environment variable
## https://flask.palletsprojects.com/en/2.2.x/cli/
ARG FLASK_APP=tjts5901.app
ARG FLASK_APP=tjts5901.app:flask_app
ENV FLASK_APP=${FLASK_APP}
## Setup the default port for flask to listen on
......@@ -31,13 +31,13 @@ ENV FLASK_RUN_PORT=${FLASK_RUN_PORT}
## Run Flask app when container started, and listen all the interfaces
## Note: CMD doesn't run command in build, but defines an starting command
## when container is started (or arguments for ENTRYPOINT).
CMD flask run --host=0.0.0.0 # --port=${FLASK_RUN_PORT} --app=${FLASK_APP}
#CMD flask run --host=0.0.0.0 # --port=${FLASK_RUN_PORT} --app=${FLASK_APP}
CMD gunicorn --bind "0.0.0.0:${FLASK_RUN_PORT}" "${FLASK_APP}"
## Examples for other commands:
## Run nothing, so that the container can be used as a base image
#CMD ["bash", "-c", "sleep infinity"]
## Run Flask app using Gunicorn, which unlike Flask, doesn't complain about being development thing.
#CMD gunicorn --bind "0.0.0.0:${PORT}"" tjts5901.app:app
## Install requirements using pip. This is done before copying the app, so that
## requirements layer is cached. This way, if app code changes, only app code is
......@@ -49,6 +49,10 @@ RUN pip --disable-pip-version-check install -r /tmp/pip-tmp/requirements.txt &&
## Copy app to WORKDIR folder
COPY . .
## Compile translations before installing app. This is done to make sure that
## translations are compiled before app is installed.
RUN pybabel compile -f -d src/tjts5901/translations/
## Install self as editable (`-e`) module. In a long run it would be recommeded
## to remove `COPY` and only install app as a package.
RUN pip --disable-pip-version-check install -v -e .
......@@ -58,6 +62,9 @@ RUN pip --disable-pip-version-check install -v -e .
ARG CI_COMMIT_SHA
ENV CI_COMMIT_SHA=${CI_COMMIT_SHA}
## Download the currency exchange rates from European Central Bank
RUN flask update-currency-rates
## Save build date and time
RUN echo "BUILD_DATE=$(date -u +'%Y-%m-%dT%H:%M:%SZ')" >> /app/.env
......
......@@ -25,3 +25,8 @@ docker run -it -p 5001:5001 -e "FLASK_DEBUG=1" -v "${PWD}:/app" tjts5901
```
Please see the `docs/tjts5901` folder for more complete documentation.
## Reporting issues and bugs
To report bugs, please use [the project "issues" form](https://gitlab.jyu.fi/startuplab/courses/tjts5901-continuous-software-engineering/TJTS5901-K23_template/-/issues/new?issuable_template=Default)
[python: **.py]
[jinja2: **/templates/**.html]
......@@ -67,6 +67,10 @@ plugins:
fallback_to_build_date: true
- macros:
module_name: macros
# Change to something else
j2_variable_start_string: "{j2{"
j2_variable_end_string: "}j2}"
extra:
social:
......
......@@ -29,3 +29,6 @@ MONGO_URL=mongodb://mongodb:27017/tjts5901
# Setup sentry environment to separate development issues from production. This variable name is
# maybe set by gitlab pipeline. <https://docs.gitlab.com/ee/ci/environments/>
CI_ENVIRONMENT_NAME=development
# Setup CI environment url to point on localhost for testing purposes.
CI_ENVIRONMENT_URL=http://localhost:5001
......@@ -29,6 +29,9 @@ dependencies = {file = ["requirements.txt"]}
test = [
"pytest",
"coverage",
"coverage[toml]",
"requests",
"faker",
]
docs = [
"mkdocs",
......@@ -45,3 +48,15 @@ build-backend = "setuptools.build_meta"
testpaths = [
"tests",
]
filterwarnings = [
## Silence json encoder warnings for Flask >=2.3
# "ignore::DeprecationWarning:mongoengine.connection",
# "ignore::DeprecationWarning:flask_mongoengine.json",
# "ignore::DeprecationWarning:flask.json.provider",
]
[tool.coverage.run]
branch = true
source_pkgs = [
"tjts5901",
]
......@@ -4,9 +4,15 @@ importlib-metadata
# Framework and libraries
flask==2.2.2
python-dotenv
flask-login
flask-babel
flask-mongoengine==1.0
CurrencyConverter
Flask-APScheduler
# Git hooks
pre-commit
......@@ -15,4 +21,4 @@ sentry-sdk[flask]
sentry-sdk[pymongo]
# More production-ready web server
#gunicorn
gunicorn
......@@ -20,8 +20,11 @@ from flask import (
request,
)
from flask_babel import _
from .utils import get_version
from .db import init_db
from .i18n import init_babel
logger = logging.getLogger(__name__)
......@@ -36,7 +39,7 @@ def create_app(config: Optional[Dict] = None) -> Flask:
flask_app.config.from_mapping(
SECRET_KEY='dev',
BRAND="Hill Valley DMC dealership",
BRAND=_("Hill Valley DMC dealership"),
)
# load the instance config, if it exists, when not testing
......@@ -55,9 +58,16 @@ def create_app(config: Optional[Dict] = None) -> Flask:
except OSError:
pass
# Initialize the Flask-Babel extension.
init_babel(flask_app)
# Initialize the database connection.
init_db(flask_app)
# Initialize the scheduler.
from .scheduler import init_scheduler # pylint: disable=import-outside-toplevel
init_scheduler(flask_app)
@flask_app.route('/debug-sentry')
def trigger_error():
division_by_zero = 1 / 0
......@@ -65,13 +75,20 @@ def create_app(config: Optional[Dict] = None) -> Flask:
# a simple page that says hello
@flask_app.route('/hello')
def hello():
return 'Hello, World!'
return _('Hello, World!')
from . import auth
flask_app.register_blueprint(auth.bp)
from .auth import init_auth
init_auth(flask_app)
from .notification import init_notification
init_notification(flask_app)
from .currency import init_currency
init_currency(flask_app)
from . import items
flask_app.register_blueprint(items.bp)
flask_app.register_blueprint(items.api)
flask_app.add_url_rule('/', endpoint='index')
return flask_app
......@@ -84,7 +101,6 @@ load_dotenv()
# Create the Flask application.
flask_app = create_app()
@flask_app.route("/server-info")
def server_info() -> Response:
"""
......
from datetime import datetime
import functools
import logging
from flask import (
Blueprint, flash, g, redirect, render_template, request, session, url_for
Blueprint, flash, redirect, render_template, request, session, url_for, abort
)
from flask_login import (
LoginManager,
login_user,
logout_user,
login_required,
current_user,
)
from flask_babel import _
from babel.dates import get_timezone
from werkzeug.security import check_password_hash, generate_password_hash
from sentry_sdk import set_user
from .models import User
from .models import AccessToken, Bid, User, Item
from mongoengine import DoesNotExist
from mongoengine.queryset.visitor import Q
bp = Blueprint('auth', __name__, url_prefix='/auth')
logger = logging.getLogger(__name__)
def init_auth(app):
"""
Integrate authentication into the application.
"""
app.register_blueprint(bp)
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
login_manager.user_loader(load_logged_in_user)
app.config['AUTH_HEADER_NAME'] = 'Authorization'
login_manager.request_loader(load_user_from_request)
login_manager.init_app(app)
@bp.before_app_request
def load_logged_in_user():
logger.debug("Initialized authentication")
def load_user_from_request(request):
"""
If a user id is stored in the session, load the user object from
Load a user from the request.
This function is used by Flask-Login to load a user from the request.
"""
api_key = request.headers.get("Authorization")
user_id = session.get('user_id')
if api_key:
api_key = api_key.replace("Bearer ", "", 1)
try:
token = AccessToken.objects.get(token=api_key)
if token.expires and token.expires < datetime.utcnow():
logger.warning("Token expired: %s", api_key)
return None
# User is authenticated
if user_id is None:
g.user = None
set_user(None)
token.last_used_at = datetime.utcnow()
token.save()
logger.debug("User authenticated via token: %r", token.user.email, extra={
"user": token.user.email,
"user_id": str(token.user.id),
"token": token.token,
})
return token.user
except DoesNotExist:
logger.error("Token not found: %s", api_key)
else:
g.user = User.objects.get(id=user_id)
set_user({"id": str(g.user.id), "email": g.user.email})
return None
def login_required(view):
@functools.wraps(view)
def wrapped_view(**kwargs):
if g.user is None:
return redirect(url_for('auth.login'))
def load_logged_in_user(user_id):
"""
Load a user from the database, given the user's id.
"""
try:
user = User.objects.get(id=user_id)
set_user({"id": str(user.id), "email": user.email})
except DoesNotExist:
logger.error("User not found: %s", user_id)
return None
return view(**kwargs)
return user
return wrapped_view
def get_user_by_email(email: str) -> User:
"""
Get a user from the database, given the user's email.
If the email is 'me', then the current user is returned.
:param email: The email of the user to get.
"""
if email is None:
abort(404)
if email == "me" and current_user.is_authenticated:
email = current_user.email
try:
user = User.objects.get_or_404(email=email)
except DoesNotExist:
logger.error("User not found: %s", email)
abort(404)
return user
@bp.route('/register', methods=('GET', 'POST'))
......@@ -59,11 +132,21 @@ def register():
elif not terms:
error = 'You must agree to the terms.'
timezone = None
try:
if user_tz := request.form.get('timezone', None):
timezone = get_timezone(user_tz).zone
except Exception as exc:
logger.debug("Error getting timezone from user data: %s", exc)
timezone = None
if error is None:
try:
user = User(
email=email,
password=generate_password_hash(password)
password=generate_password_hash(password),
timezone=timezone,
)
user.save()
flash("You have been registered. Please log in.")
......@@ -85,6 +168,7 @@ def login():
email = request.form['email']
password = request.form['password']
user = None
error = None
try:
user = User.objects.get(email=email)
......@@ -97,19 +181,119 @@ def login():
error = 'Incorrect password.'
if error is None:
session.clear()
session['user_id'] = str(user['id'])
flash(f"Hello {email}, You have been logged in.")
return redirect(url_for('items.index'))
remember_me = bool(request.form.get("remember-me", False))
if login_user(user, remember=remember_me):
flash(f"Hello {email}, You have been logged in.")
next = request.args.get('next')
# Better check that the user actually clicked on a relative link
# or else they could redirect you to a malicious website!
if next is None or not next.startswith('/'):
next = url_for('index')
return redirect(next)
else:
error = "Error logging in."
print("Error logging in:", error)
logger.info("Error logging user in: %r: Error: %s", email, error)
flash(error)
return render_template('auth/login.html')
@bp.route('/logout')
@login_required
def logout():
session.clear()
"""
Log out the current user.
Also removes the "remember me" cookie.
"""
logout_user()
flash("You have been logged out.")
return redirect(url_for('index'))
@bp.route('/profile', defaults={'email': 'me'})
@bp.route('/profile/<email>')
@login_required
def profile(email):
"""
Show the user's profile page for the given email.
If the email is 'me', then the current user's profile is shown.
"""
user: User = get_user_by_email(email)
# List the items user has created
items = Item.objects(seller=user).all()
# List the items user has won
# TODO: Could be done smarter with a join
bids = Bid.objects(bidder=user).only("id").all()
won_items = Item.objects(winning_bid__in=bids).all()
return render_template('auth/profile.html', user=user, items=items, won_items=won_items)
@bp.route('/profile/<email>/token', methods=('GET', 'POST'), defaults={'email': 'me'})
@login_required
def user_access_tokens(email):
"""
Show the user's tokens page for the given email.
"""
user: User = get_user_by_email(email)
# Fetch all the user tokens that are active or have no expire date
tokens = AccessToken.objects(Q(expires__gte=datetime.now()) | Q(expires=None), user=user).all()
token = None
if request.method == 'POST':
try:
name = request.form['name']
if expires := request.form.get('expires'):
expires = datetime.fromisoformat(expires)
else:
expires = None
token = AccessToken(
user=user,
name=name,
expires=expires,
)
token.save()
except KeyError as exc:
logger.debug("Missing required field: %s", exc)
flash(_("Required field missing"))
except Exception as exc:
logger.exception("Error creating token: %s", exc)
flash(_("Error creating token: %s") % exc)
else:
flash(_("Created token: %s") % token.name)
return render_template('auth/tokens.html', user=user, tokens=tokens, token=token)
@bp.route('/profile/<email>/token/<id>', methods=('POST',))
def delete_user_access_token(email, id):
"""
Delete an access token.
"""
user = get_user_by_email(email)
token = AccessToken.objects.get_or_404(id=id)
if token.user != user:
logger.warning("User %s tried to delete token %s", user.email, token.name, extra={
"user": user.email,
"token": str(token.id),
"token_user": token.user.email,
})
abort(403)
token.delete()
flash(f"Deleted token {token.name}")
return redirect(url_for('auth.user_access_tokens', email=token.user.email))
"""
Currency module.
This module contains the currency module, which is used to convert currencies.
Uses the ECB (European Central Bank) as the source of currency conversion rates.
To update the currency conversion rates, run the following command:
$ flask update-currency-rates
"""
from decimal import Decimal
import logging
import os
from pathlib import Path
from zipfile import ZipFile
import urllib.request
import click
from currency_converter import (
SINGLE_DAY_ECB_URL,
CurrencyConverter,
)
from flask_babel import (
get_locale,
format_currency,
)
from babel.numbers import get_territory_currencies, parse_decimal
from flask import (
Flask,
current_app,
render_template,
)
from markupsafe import Markup
from .auth import current_user
REF_CURRENCY = 'EUR'
"Reference currency for the currency converter."
logger = logging.getLogger(__name__)
class CurrencyProxy:
"""
Proxy for the currency converter.
This class is used to proxy the currency converter instance. This is to
ensure that the currency converter is only initialized when it is actually
used, and the used conversion list is the most up-to-date.
"""
def __init__(self, app: Flask):
self._converter = None
self._app = app
self._converter_updated = 0
self._dataset_updated = 0
def get_currency_converter(self) -> CurrencyConverter:
"""
Get a currency converter instance.
Automatically updates the currency converter if the dataset has been
updated.
Exceptions:
RuntimeError: If the currency file is not configured.
FileNotFoundError: If the currency file does not exist.
:return: A currency converter instance.
"""
if not (conversion_file := self._app.config.get('CURRENCY_FILE')):
raise RuntimeError('Currency file not configured.')
# Initialize the currency converter if it has not been initialized yet,
# or if the dataset has been updated.
self._dataset_updated = Path(conversion_file).stat().st_mtime
if self._converter is None or self._dataset_updated > self._converter_updated:
logger.info("Initializing currency converter with file %s.", conversion_file)
self._converter = CurrencyConverter(
currency_file=conversion_file,
ref_currency=REF_CURRENCY,
)
self._converter_updated = self._dataset_updated
return self._converter
def __getattr__(self, name):
"""
Proxy all other attributes to the currency converter.
"""
return getattr(self.get_currency_converter(), name)
def init_currency(app: Flask):
"""
Initialize the currency module.
This function initializes the currency module, and registers the currency
converter as an extension.
:param app: The Flask application.
:return: None
"""
# Set default currency file path
app.config.setdefault('CURRENCY_FILE', app.instance_path + '/currency.csv')
# Register the currency converter as an extension
app.extensions['currency_converter'] = CurrencyProxy(app)
# Register the currency converter as a template filter
app.add_template_filter(format_converted_currency, name='localcurrency')
app.cli.add_command(update_currency_rates)
def format_converted_currency(value, currency=None, **kwargs):
"""
Render a currency value in the preferred currency.
This function renders a currency value in the preferred currency for the
current locale. If the preferred currency is not the reference currency,
the value is converted to the preferred currency.
"""
if currency is None:
currency = get_preferred_currency()
# Convert the value to the preferred currency
local_value = convert_currency(value, currency)
# Format the value
html = render_template("money-tag.html",
base_amount=format_currency(value, currency=REF_CURRENCY, format_type='name', **kwargs),
local_amount=format_currency(local_value, currency=currency, **kwargs))
return Markup(html)
def convert_currency(value, currency=None, from_currency=REF_CURRENCY):
"""
Convert a currency value to the preferred currency.
This function converts a currency value to the preferred currency for the
current locale. If the preferred currency is not the reference currency,
the value is converted to the preferred currency.
"""
if currency != REF_CURRENCY:
return current_app.extensions['currency_converter'].convert(value, from_currency, currency)
return value
def convert_from_currency(value, currency) -> Decimal:
"""
Parses the localized currency value and converts it to the reference currency.
"""
locale = get_locale()
amount = parse_decimal(value, locale=locale)
if currency != REF_CURRENCY:
amount = Decimal(current_app.extensions['currency_converter'].convert(amount, currency, REF_CURRENCY))
return amount
def get_currencies():
"""
Get the list of supported currencies.
"""
return current_app.extensions['currency_converter'].currencies
def get_preferred_currency():
"""
Get the preferred currency.
This function returns the preferred currency for the current locale.
:return: The preferred currency.
"""
if current_user.is_authenticated and current_user.currency:
return str(current_user.currency)
# Fall back to the default currency for the locale
if territory := get_locale().territory:
currency = get_territory_currencies(territory)[0]
if currency in get_currencies():
return currency
else:
logger.warning("Default currency %s is not supported, falling back to %s.", currency, REF_CURRENCY)
return REF_CURRENCY
@click.command()
def update_currency_rates():
"""
Update currency file from the European Central Bank.
This command is meant to be run from the command line, and is not meant to be
used in the application:
$ flask update-currency-rates
:return: None
"""
click.echo('Updating currency file from the European Central Bank...')
fetch_currency_file()
click.echo('Done.')
def fetch_currency_file():
"""
Fetch the currency file from the European Central Bank.
This function fetches the currency file from the European Central Bank, and
stores it in the configured currency file path.
"""
from tempfile import NamedTemporaryFile # pylint: disable=import-outside-toplevel
fd, _ = urllib.request.urlretrieve(SINGLE_DAY_ECB_URL)
with ZipFile(fd) as zf:
file_name = zf.namelist().pop()
# Create a temporary file to store the currency file, to avoid corrupting
# the existing file if the download fails, or while writing the file.
file_path = os.path.dirname(current_app.config['CURRENCY_FILE'])
if not os.path.exists(file_path):
os.makedirs(file_path)
with NamedTemporaryFile(dir=file_path, delete=False) as f:
f.write(zf.read(file_name))
f.flush()
# Move the temporary file to the configured currency file path
os.rename(f.name, current_app.config['CURRENCY_FILE'])
"""
Internationalisation and localisation support for the application.
"""
from enum import Enum
import os
from typing import List
from flask_babel import Babel, get_locale as get_babel_locale
from babel import Locale
from babel import __version__ as babel_version
from flask import (
Flask,
g,
request,
session,
)
from werkzeug.datastructures import LanguageAccept
from flask_login import current_user
import logging
logger = logging.getLogger(__name__)
class SupportedLocales(Enum):
"""
Supported locales for the application.
The values are the locale identifiers used by the Babel library.
Order here determines the order in which the locales are presented to the
user, and the order in which the locales are tried when the user does not
specify a preferred locale.
"""
FI = "fi_FI.UTF-8"
"Finnish (Finland)"
SV = "sv_SE.UTF-8"
"Swedish (Sweden)"
EN = "en_GB.UTF-8"
"English (United Kingdom)"
# EN_US = "en_US.UTF-8"
# "English (United States)"
TLH = "tlh"
"Klingon"
TIMEZONES = {
"""
Timezones for supported locales.
The values are the timezone identifiers used by the Babel library.
This approach doesnt work for countries that have multiple timezones, like
the US.
"""
"fi_FI": "Europe/Helsinki",
"sv_SE": "Europe/Stockholm",
"en_GB": "Europe/London",
"tlh": "America/New_York",
}
def init_babel(flask_app: Flask):
"""
Initialize the Flask-Babel extension.
"""
# Monkeypatch klingon support into babel
# Klingon reverts to English
hack_babel_core_to_support_custom_locales({"tlh": "en"})
# Configure the Flask-Babel extension.
# Try setting the default locale from underlying OS. Falls back into English.
system_language = Locale.default().language
translation_dir = os.path.join(os.path.dirname(__file__), "translations")
flask_app.config.setdefault("BABEL_TRANSLATION_DIRECTORIES", translation_dir)
flask_app.config.setdefault("BABEL_DEFAULT_LOCALE", system_language)
babel = Babel(flask_app, locale_selector=get_locale, timezone_selector=get_timezone)
# Register `locales` as jinja variable to be used in templates. Uses the
# `Locale` class from the Babel library, so that the locale names can be
# translated.
locales = {}
for locale in SupportedLocales:
locales[locale.value] = Locale.parse(locale.value)
flask_app.jinja_env.globals.update(locales=locales)
# Register `get_locale` as jinja function to be used in templates
flask_app.jinja_env.globals.update(get_locale=get_babel_locale)
# If url contains locale parameter, set it as default in session
@flask_app.before_request
def set_locale():
if request.endpoint != "static":
if locale := request.args.get('locale'):
if locale in (str(l) for l in locales.values()):
logger.debug("Setting locale %s from URL.", locale)
session['locale'] = locale
else:
logger.warning("Locale %s not supported.", locale)
logger.info("Initialized Flask-Babel extension %s.", babel_version,
extra=flask_app.config.get_namespace("BABEL_"))
return babel
def hack_babel_core_to_support_custom_locales(custom_locales: dict):
""" Hack Babel core to make it support custom locale names
Based on : https://github.com/python-babel/babel/issues/454
Patch mechanism provided by @kolypto
Args:
custom_locales: Mapping from { custom name => ordinary name }
"""
from babel.core import get_global
# In order for Babel to know "en_CUSTOM", we have to hack its database and put our custom
# locale names there.
# This database is pickle-loaded from a .dat file and cached, so we only have to do it once.
db = get_global('likely_subtags')
for custom_name in custom_locales:
db[custom_name] = custom_name
# Also, monkey-patch the exists() and load() functions that load locale data from 'babel/locale-data'
import babel.localedata
# Originals
o_exists, o_load, o_parse_locale = babel.localedata.exists, babel.localedata.load, babel.core.parse_locale
# Definitions
def exists(name):
# Convert custom names to normalized names
name = custom_locales.get(name, name)
return o_exists(name)
def load(name, merge_inherited=True):
# Convert custom names to normalized names
original_name = custom_locales.get(name, name)
l_data = o_load(original_name, merge_inherited)
l_data['languages']['tlh'] = 'Klingon'
l_data.update({
'locale_id': name,
})
return l_data
# Definitions
def parse_locale(name, sep='_'):
# Convert custom names to normalized names
name = custom_locales.get(name, name)
l_data = o_parse_locale(name, sep)
return l_data
# Make sure we do not patch twice
if o_exists.__module__ != __name__:
babel.localedata.exists = exists
babel.localedata.load = load
# if o_parse_locale.__module__ != __name__:
# babel.core.parse_locale = parse_locale
# See that they actually exist
for normalized_name in custom_locales.values():
assert o_exists(normalized_name)
def get_locale():
"""
Get the locale for user.
Looks at the user model for the user's preferred locale. If the user has not
set a preferred locale, check the browser's Accept-Language header. If the
browser does not specify a preferred locale, use the default locale.
todo: What happens if the user's preferred locale support is dropped from
todo: the application?
:return: Suitable locale for the user.
"""
# if a locale was stored in the session, use that
if locale := session.get('locale'):
logger.debug("Setting locale %s from session.", locale)
return locale
# if a user is logged in, use the locale from the user settings
if current_user.is_authenticated and current_user.locale:
logger.debug("Using locale %s from user settings.", current_user.locale)
return current_user.locale
# otherwise try to guess the language from the user accept header the
# browser transmits.
# The Accept-Language header is a list of languages the user prefers,
# ordered by preference. The first language is the most preferred.
# The language is specified as a language tag, which is a combination of
# a language code and a country code, separated by a hyphen.
# For example, en-GB is English (United Kingdom).
# The language code is a two-letter code, and the country code is a
# two-letter code, or a three-digit number. The country code is optional.
# For example, en is English (no country specified), and en-US is English
# Convert the Enum of supported locales into a list of language tags.
# Fancy way: locales_to_try = [locale.value for locale in SupportedLocales]
locales_to_try: List[str] = list()
for locale in SupportedLocales:
locales_to_try.append(str(locale.value))
# Get the best match for the Accept-Language header.
locale = request.accept_languages.best_match(locales_to_try)
logger.debug("Best match for Accept-Language header (%s) is %s.",
request.accept_languages, locale)
return locale
def get_timezone():
"""
Get the timezone for user.
Looks at the user model for the user's preferred timezone. If the user has
not set a preferred timezone, use the default timezone.
"""
# if a user is logged in, use the timezone from the user settings
if current_user.is_authenticated and current_user.timezone:
logger.debug("Using locale %s from user settings.", current_user.timezone)
return current_user.timezone
# Try detecting the timezone from the user's locale.
locale = get_locale()
choises = [(k, 1) for k in TIMEZONES.keys()]
# Use the best_match method from the LanguageAccept class to get the best
# match for the user's locale.
best_match = LanguageAccept(choises).best_match([locale])
if best_match:
logger.debug("Guessing timezone %s from locale %s.", TIMEZONES[best_match], locale)
return TIMEZONES[best_match]
from datetime import datetime, timedelta
import logging
from typing import Optional
from flask import (
Blueprint, flash, g, redirect, render_template, request, url_for
Blueprint, flash, redirect, render_template, request, url_for, jsonify, current_app
)
from flask_babel import _, get_locale, lazy_gettext
from werkzeug.exceptions import abort
from .auth import login_required
from .models import Item
from markupsafe import Markup
from .auth import login_required, current_user
from .models import Bid, Item
from .currency import (
convert_currency,
format_converted_currency,
convert_from_currency,
get_currencies,
get_preferred_currency,
REF_CURRENCY,
)
from .notification import send_notification
bp = Blueprint('items', __name__)
api = Blueprint('api_items', __name__, url_prefix='/api/items')
logger = logging.getLogger(__name__)
MIN_BID_INCREMENT = 1
def get_item(id):
......@@ -17,11 +36,117 @@ def get_item(id):
print("Error getting item:", exc)
abort(404)
if item.seller == g.user:
if item.seller == current_user:
return item
abort(403)
def get_winning_bid(item: Item) -> Optional[Bid]:
"""
Return the (currently) winning bid for the given item.
If there are no bids, or the item is not yet closed, return None.
:param item: The item to get the winning bid for.
:return: The winning bid, or None.
"""
winning_bid = None
# If the item is closed, return the winning bid
if item.closed and item.winning_bid:
return item.winning_bid
# Sanity check: if the item is not closed, it should not have a winning bid
assert not item.closed or not (not item.closed and winning_bid), "Item is not closed, but has a winning bid"
try:
# Get the highest bid that was placed before the item closed
winning_bid = Bid.objects(item=item) \
.filter(created_at__lt=item.closes_at) \
.order_by('-amount') \
.first()
except Exception as exc:
logger.warning("Error getting winning bid: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
return winning_bid
def get_item_price(item: Item) -> int:
"""
Return the current price of the given item.
If there are no bids, return the starting bid.
:param item: The item to get the price for.
:return: The current price.
"""
winning_bid = get_winning_bid(item)
if winning_bid:
return winning_bid.amount + MIN_BID_INCREMENT
else:
return item.starting_bid
def handle_item_closing(item):
"""
Handle the closing of an item.
Checks if the item is not closed yet, but should be closed now. If so,
closes the item, and send notifications to the seller and the buyer.
:param item: The item to handle.
"""
# Handle the closing of an item
if not item.is_open and not item.closed:
logger.info("Closing item %r (%s)", item.title, item.id, extra={
'item_id': item.id,
'item_title': item.title,
'item_closes_at': item.closes_at,
})
# Get the winning bid
winning_bid = get_winning_bid(item)
if winning_bid:
item.winning_bid = winning_bid
# Send a notifications to the seller and the buyer
# lazy_gettext() is used to delay the translation until the message is sent
# Markup.escape() is used to escape strings, to prevent XSS attacks
send_notification(
item.seller,
title=lazy_gettext("Your item was sold"),
message=lazy_gettext("Your item <em>%(title)s</em> was sold to %(buyer)s for %(price)s.",
title=Markup.escape(item.title),
buyer=Markup.escape(winning_bid.bidder.email),
price=Markup.escape(winning_bid.amount)),
)
send_notification(
winning_bid.bidder,
title=lazy_gettext("You won an item"),
message=lazy_gettext("You won the item <em>%(title)s</em> for %(price)s.",
title=Markup.escape(item.title),
price=Markup.escape(winning_bid.amount)),
)
else:
# If there is no winning bid, send a notification to the seller
send_notification(
item.seller,
title=lazy_gettext("Your item was not sold"),
message=lazy_gettext("Your item <em>%(title)s</em> was not sold.",
title=Markup.escape(item.title)),
)
# Close the item
item.closed = True
item.save()
@bp.route("/", defaults={'page': 1})
@bp.route("/items/<int:page>")
def index(page=1):
......@@ -50,34 +175,90 @@ def sell():
if request.method == 'POST':
title = request.form['title']
description = request.form['description']
starting_bid = int(request.form['starting_bid'])
currency = request.form.get('currency', REF_CURRENCY)
starting_bid = convert_from_currency(request.form['starting_bid'], currency)
error = None
if not title:
error = 'Title is required.'
if not starting_bid or starting_bid < 1:
error = 'Starting bid must be greater than 0.'
error = Markup(_("Starting bid must be greater than %(amount)s.", amount=format_converted_currency(1, currency)))
if error is None:
try:
sale_length = timedelta(days=1)
if current_app.config['DEBUG'] and request.form.get("flash-sale"):
sale_length = timedelta(seconds=20)
item = Item(
title=title,
description=description,
starting_bid=starting_bid,
seller=g.user,
closes_at=datetime.utcnow() + timedelta(days=1)
seller=current_user,
closes_at=datetime.utcnow() + sale_length,
)
item.save()
flash(_('Item listed successfully!'))
except Exception as exc:
error = f"Error creating item: {exc!s}"
error = _("Error creating item: %(exc)s", exc=exc)
logger.warning("Error creating item: %s", exc, exc_info=True, extra={
'title': title,
'description': description,
'starting_bid': starting_bid,
})
else:
return redirect(url_for('items.index'))
print(error)
flash(error)
print(error)
flash(error, category='error')
# Get the list of currencies, and map them to their localized names
currencies = {}
names = get_locale().currencies
for currency in get_currencies():
currencies[currency] = names.get(currency, currency)
return render_template('items/sell.html', currencies=currencies, default_currency=get_preferred_currency())
@bp.route('/item/<id>')
def view(id):
"""
Item view page.
Displays the item details, and a form to place a bid.
"""
item = Item.objects.get_or_404(id=id)
return render_template('items/sell.html')
# !!! This is disabled as it might cause race conditions
# !!! if multiple users are accessing the same item at the same time
# Check if the item is closed, and handle it if so.
#handle_item_closing(item)
# Set the minumum price for the bid form from the current winning bid
winning_bid = get_winning_bid(item)
min_bid = get_item_price(item)
local_currency = get_preferred_currency()
local_min_bid = convert_currency(min_bid, local_currency)
if item.closes_at < datetime.utcnow():
if winning_bid and winning_bid.bidder == current_user:
flash(_("Congratulations! You won the auction!"), "success")
else:
flash(_("This item is no longer on sale."))
elif item.closes_at < datetime.utcnow() + timedelta(hours=1):
# Dark pattern to show enticing message to user
flash(_("This item is closing soon! Act now! Now! Now!"))
return render_template('items/view.html',
item=item, min_bid=min_bid,
local_min_bid=local_min_bid,
local_currency=local_currency)
@bp.route('/item/<id>/update', methods=('GET', 'POST'))
......@@ -91,20 +272,23 @@ def update(id):
error = None
if not title:
error = 'Title is required.'
error = _('Title is required.')
try:
item.title = title
item.description = description
item.save()
except Exception as exc:
error = f"Error updating item: {exc!s}"
error = _("Error updating item: %(exc)s", exc=exc)
logger.warning("Error updating item: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
else:
flash("Item updated successfully!")
flash(_("Item updated successfully!"))
return redirect(url_for('items.index'))
print(error)
flash(error)
flash(error, category='error')
return render_template('items/update.html', item=item)
......@@ -116,9 +300,149 @@ def delete(id):
try:
item.delete()
except Exception as exc:
error = f"Error deleting item: {exc!s}"
print(error)
flash(error)
logger.warning("Error deleting item: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
flash(_("Error deleting item: %(exc)s", exc=exc), category='error')
else:
flash("Item deleted successfully!")
flash(_("Item deleted successfully!"))
return redirect(url_for('items.index'))
@bp.route('/item/<id>/bid', methods=('POST',))
@login_required
def bid(id):
"""
Bid on an item.
If the bid is valid, create a new bid and redirect to the item view page.
Otherwise, display an error message and redirect back to the item view page.
:param id: The id of the item to bid on.
:return: A redirect to the item view page.
"""
item = Item.objects.get_or_404(id=id)
min_amount = get_item_price(item)
local_amount = request.form['amount']
currency = request.form.get('currency', REF_CURRENCY)
amount = convert_from_currency(local_amount, currency)
if amount < min_amount:
flash(_("Bid must be at least %(min_amount)s", min_amount=format_converted_currency(min_amount)))
return redirect(url_for('items.view', id=id))
if item.closes_at < datetime.utcnow():
flash("This item is no longer on sale.")
return redirect(url_for('items.view', id=id))
try:
# Notice: if you have integrated the flask-login extension, use current_user
# instead of g.user
bid = Bid(
item=item,
bidder=current_user,
amount=amount,
)
bid.save()
except Exception as exc:
flash(_("Error placing bid: %(exc)s", exc=exc))
else:
flash(_("Bid placed successfully!"))
return redirect(url_for('items.view', id=id))
@api.route('<id>/bids', methods=('GET',))
@login_required
def api_item_bids(id):
"""
Get the bids for an item.
:param id: The id of the item to get bids for.
:return: A JSON response containing the bids.
"""
item = Item.objects.get_or_404(id=id)
bids = []
for bid in Bid.objects(item=item).order_by('-amount'):
bids.append(bid.to_json())
return jsonify({
'success': True,
'bids': bids
})
@api.route('<id>/bids', methods=('POST',))
@login_required
def api_item_place_bid(id):
"""
Place a bid on an item.
If the bid is valid, create a new bid and return the bid.
Otherwise, return an error message.
Only accepts `REF_CURRENCY` bids.
:param id: The id of the item to bid on.
:return: A JSON response containing the bid.
"""
item = Item.objects.get_or_404(id=id)
min_amount = get_item_price(item)
try:
amount = int(request.form['amount'])
except KeyError:
return jsonify({
'success': False,
'error': _("Missing required argument %(argname)s", argname='amount')
})
except ValueError:
return jsonify({
'success': False,
'error': _("Invalid value for argument %(argname)s", argname='amount')
})
except Exception as exc:
return jsonify({
'success': False,
'error': _("Error parsing argument %(argname)s: %(exc)s", argname='amount', exc=exc)
})
if amount < min_amount:
return jsonify({
'success': False,
'error': _("Bid must be at least %(min_amount)s", min_amount=min_amount)
})
if item.closes_at < datetime.utcnow():
return jsonify({
'success': False,
'error': _("This item is no longer on sale.")
})
try:
bid = Bid(
item=item,
bidder=current_user,
amount=amount,
)
bid.save()
except Exception as exc:
logger.error("Error placing bid: %s", exc, exc_info=True, extra={
'item_id': item.id,
'bidder_id': current_user.id,
'amount': amount,
})
return jsonify({
'success': False,
'error': _("Error placing bid: %(exc)s", exc=exc)
})
return jsonify({
'success': True,
'bid': bid.to_mongo().to_dict()
})
......@@ -76,7 +76,7 @@ def init_logging(app: Flask):
# Setup sentry logging
sentry_dsn = app.config.get("SENTRY_DSN")
release = app.config.get("CI_COMMIT_SHA", get_version() or "dev")
enviroment = app.config.get("CI_ENVIRONMENT_NAME")
enviroment = app.config.get("SENTRY_ENVIRONMENT", "production")
if sentry_dsn:
sentry = sentry_sdk.init(
......
from datetime import datetime
from secrets import token_urlsafe
from urllib.parse import urlencode
from flask import url_for
from markupsafe import Markup
from .db import db
from mongoengine import (
......@@ -7,21 +12,78 @@ from mongoengine import (
ReferenceField,
DateTimeField,
EmailField,
BooleanField,
EnumField,
)
from mongoengine.queryset import CASCADE
from flask_login import UserMixin
from bson import ObjectId
from .i18n import SupportedLocales
class User(db.Document):
class User(UserMixin, db.Document):
"""
Model representing a user of the auction site.
"""
id: ObjectId
email = EmailField(required=True, unique=True)
"The user's email address."
password = StringField(required=True)
locale = StringField(default=SupportedLocales.EN.value)
currency = StringField(max_length=3)
"The user's preferred currency."
timezone = StringField(max_length=50)
created_at = DateTimeField(required=True, default=datetime.utcnow)
is_disabled = BooleanField(default=False)
"Whether the user is disabled."
@property
def image_url(self) -> str:
"""
Return the URL of the user's avatar.
"""
import hashlib
digest = hashlib.md5(self.email.lower().encode("utf-8")).hexdigest()
default = url_for("static", filename="img/default-profile.png", _external=True)
params = urlencode({"s": 200, "d": default})
return f"https://www.gravatar.com/avatar/{digest}?{params}"
@property
def is_active(self) -> bool:
"""
Return whether the user is active.
This is used by Flask-Login to determine whether the user is
allowed to log in.
"""
return not self.is_disabled
def get_id(self) -> str:
"""
Return the user's id as a string.
"""
return str(self.id)
def __html__(self) -> str:
"""
Return a html representation of the user.
"""
url = url_for("auth.profile", email=self.email)
username = self.email.split("@")[0]
safe_username = Markup.escape(username)
return f"<a href=\"{url}\" class=\"profile-link\">@{safe_username}</a>"
class Item(db.Document):
"""
......@@ -41,6 +103,104 @@ class Item(db.Document):
starting_bid = IntField(required=True, min_value=0)
seller = ReferenceField(User, required=True)
winning_bid = ReferenceField("Bid")
closed = BooleanField(default=False)
"Whether the item has been closed."
created_at = DateTimeField(required=True, default=datetime.utcnow)
closes_at = DateTimeField()
@property
def is_open(self) -> bool:
"""
Return whether the item is open for bidding.
"""
if self.closed:
return False
return self.closes_at > datetime.utcnow()
class Bid(db.Document):
"""
A model for bids on items.
"""
meta = {"indexes": [
{"fields": [
"amount",
"item",
"created_at",
]}
]}
amount = IntField(required=True, min_value=0)
"Indicates the value of the bid."
bidder = ReferenceField(User, required=True)
"User who placed the bid."
item = ReferenceField(Item, required=True)
"Item that the bid is for."
created_at = DateTimeField(required=True, default=datetime.utcnow)
"Date and time that the bid was placed."
class AccessToken(db.Document):
"""
Access token for a user.
This is used to authenticate API requests.
"""
meta = {"indexes": [
{"fields": [
"token",
"user",
"expires",
]}
]}
name = StringField(max_length=100, required=True)
"Human-readable name for the token."
user = ReferenceField(User, required=True, reverse_delete_rule=CASCADE)
"User that the token is for."
token = StringField(required=True, unique=True, default=token_urlsafe)
"The token string."
last_used_at = DateTimeField(required=False)
"Date and time that the token was last used."
created_at = DateTimeField(required=True, default=datetime.utcnow)
"Date and time that the token was created."
expires = DateTimeField(required=False)
"Date and time that the token expires."
class Notification(db.Document):
"""
Represents a message between two users, or a message to a user from
the system.
"""
meta = {"indexes": [
{"fields": [
"user",
"read_at",
"created_at",
]}
]}
id: ObjectId
user = ReferenceField(User, required=True, reverse_delete_rule=CASCADE)
category = StringField(max_length=100, default="message")
message = StringField(required=True)
title = StringField(max_length=120)
created_at = DateTimeField(required=True, default=datetime.utcnow)
read_at = DateTimeField(required=False)
"""
This module provides a way to send notifications to users.
"""
import dataclasses
from datetime import datetime
import logging
from flask import get_flashed_messages, jsonify, Blueprint
from flask_login import current_user, login_required
from flask_babel import force_locale, lazy_gettext
from .models import Notification, User
bp = Blueprint('notification', __name__, url_prefix='/')
logger = logging.getLogger(__name__)
@dataclasses.dataclass
class Message:
"""
Represents a message to be displayed to the user.
"""
message: str
category: str
# Additional field for the template
title: str = lazy_gettext("Message")
created_at: datetime = dataclasses.field(default_factory=datetime.utcnow)
def init_notification(app):
"""
Initialize the notifications module.
"""
app.register_blueprint(bp)
app.jinja_env.globals.update(get_notifications=get_notifications)
def send_notification(user, message, category="message", title=None):
"""
Send a notification to the given user.
:param user: The user to send the message to.
:param subject: The subject of the message.
:param message: The message to send.
"""
# Change the locale to the message recipient locale.
with force_locale(user.locale):
notification = Notification(
user=user,
message=str(message),
category=category,
title=str(title),
)
notification.save()
def get_notifications(user: User = current_user) -> list[Message]:
"""
Get the messages for the given user.
Flash messages are returned first, followed by database messages.
Messages are marked as read when they are retrieved.
Notice: the listing query might not return recently added messages.
:param user: The user to get the messages for.
:return: A list of messages.
"""
messages = []
# Get the flash messages first
for category, message in get_flashed_messages(with_categories=True):
messages.append(Message(message, category))
if user is None or not user.is_authenticated:
logger.debug("User is not authenticated, returning flash messages.")
return messages
# Get the database messages
notifications = Notification.objects(user=user, read_at=None).order_by('-created_at').all()
for notification in notifications:
messages.append(Message(notification.message, notification.category, notification.title))
# Mark the messages as read
notifications.update(read_at=datetime.utcnow())
return messages
@bp.route('/notifications.json', methods=('GET',))
@login_required
def user_notifications():
"""
Show the form to send a message to the given user.
"""
user = current_user
# Convert the notifications to a list of dictionaries
notifications = []
for notification in get_notifications(user):
notifications.append(dataclasses.asdict(notification))
return jsonify({
"success": True,
"notifications": notifications,
})
"""
This module contains the APScheduler extension.
This extension is used to schedule background tasks.
"""
from datetime import datetime, timedelta
import logging
from random import randint
from flask_apscheduler import APScheduler
from apscheduler.schedulers import SchedulerAlreadyRunningError
from mongoengine import signals, Q
from .models import Item
from .items import handle_item_closing
logger = logging.getLogger(__name__)
# Having the scheduler as a global variable is not ideal, but it's the easiest
# way to make it accessible.
scheduler = APScheduler()
def init_scheduler(app):
"""
Initialize the APScheduler extension.
This function is meant to be called from the create_app() function.
"""
try:
scheduler.init_app(app)
# Due to the scheduler being utilised as global variable, check if
# the scheduler is already running. If it is, then it means that the
# scheduler has already been initialised.
if not scheduler.running and not app.config.get('TESTING'):
# Add a signal handler to schedule a task to close the item when the auction
# ends.
signals.post_save.connect(_schedule_item_closing_task, sender=Item)
# Add a batch task to close expired bids every 15 minutes. This is to ensure
# that the bids are closed even if the server is restarted.
scheduler.add_job(trigger='interval', minutes=15,
func=_close_items,
id='close-items')
# Add a task to update the currency rates from the European Central Bank every
# day at random time between 5:00 and 5:59.
scheduler.add_job(trigger='cron', hour=5, minute=randint(0, 59),
func=_update_currency_rates,
id='update-currency-rates')
with app.app_context():
scheduler.start()
logger.debug('APScheduler started')
except SchedulerAlreadyRunningError:
logger.debug('APScheduler already running')
except Exception as exc:
logger.exception("Failed to initialize APScheduler: %s", exc)
return app
def _handle_item_closing(item_id):
"""
Handle the closing of an item.
This function is meant to be run by the APScheduler, and is not meant to be
called directly.
"""
with scheduler.app.app_context():
item = Item.objects.get(id=item_id)
handle_item_closing(item)
# Even as this is named function, it's used as a closure, so it can access
# the scheduler variable.
def _schedule_item_closing_task(sender, document, **kwargs): # pylint: disable=unused-argument
"""
Schedule a task to close the item when the auction ends.
This function is meant to be connected to the post_save signal of the Item
model.
"""
if not document.closes_at:
# The item does not have an auction end time, so there is no need to
# schedule a task to close it.
logger.debug("Not scheduling closing, as item %s does not have an auction end time", document.id)
return
if document.closed:
# The item is already closed, so there is no need to schedule a task to
# close it.
return
logger.debug('Scheduling task to close item %s', document.id)
scheduler.add_job(
func=_handle_item_closing,
args=(document.id,),
trigger='date',
run_date=document.closes_at + timedelta(seconds=1),
id=f'close-item-{document.id}',
)
def _close_items():
"""
Close expired bids.
This function is meant to be run by the APScheduler, and is not meant to be
called directly.
"""
with scheduler.app.app_context():
logger.info("Running scheduled task 'close-items'")
# Get items that are past the closing date, and are not already closed
closes_before = datetime.utcnow() + timedelta(seconds=2)
items = Item.objects(Q(closed=None) | Q(closed=False), closes_at__lt=closes_before).all()
logger.debug("Closing %d items", len(items))
# Close each item
for item in items:
try:
# Make sure item is not already closed
if item.closed:
continue
handle_item_closing(item)
except Exception as exc:
logger.error("Error closing items: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
def _update_currency_rates():
"""
Update the currency rates from the European Central Bank.
This function is meant to be run by the APScheduler, and is not meant to be
called directly.
"""
from .currency import fetch_currency_file
with scheduler.app.app_context():
logger.debug("Running scheduled task 'update-currency-rates'")
fetch_currency_file()
src/tjts5901/static/flags/en_GB.png

9.91 KiB