Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • startuplab/courses/tjts5901-continuous-software-engineering/TJTS5901-K23_template
  • planet-of-the-apes/tjts-5901-apeuction
  • uunot-yliopiston-leivissa/tjts-5901-uunot
  • contain-the-cry/tjts-5901-auction-system
  • avengers/avengers
  • cse6/cse-6
  • 13th/13-sins-of-gitlab
  • fast-and-furious/fast-and-furious
  • back-to-the-future/delorean-auction
  • monty-pythons-the-meaning-of-life/the-meaning-of-life
  • team-atlantis/the-empire
  • code-with-the-wind/auction-project
  • the-pirates/the-pirates
  • do-the-right-thing/do-the-right-thing
  • inception/inception
  • the-social-network-syndicate/the-social-auction-network
  • team-the-hunt-for-red-october/tjts-5901-k-23-red-october
  • good-on-paper/good-paper-project
  • desperados/desperados
19 results
Show changes
Showing
with 9377 additions and 0 deletions
"""
Currency module.
This module contains the currency module, which is used to convert currencies.
Uses the ECB (European Central Bank) as the source of currency conversion rates.
To update the currency conversion rates, run the following command:
$ flask update-currency-rates
"""
from decimal import Decimal
import logging
import os
from pathlib import Path
from zipfile import ZipFile
import urllib.request
import click
from currency_converter import (
SINGLE_DAY_ECB_URL,
CurrencyConverter,
)
from flask_babel import (
get_locale,
format_currency,
)
from babel.numbers import get_territory_currencies, parse_decimal
from flask import (
Flask,
current_app,
render_template,
)
from markupsafe import Markup
from .auth import current_user
REF_CURRENCY = 'EUR'
"Reference currency for the currency converter."
logger = logging.getLogger(__name__)
class CurrencyProxy:
"""
Proxy for the currency converter.
This class is used to proxy the currency converter instance. This is to
ensure that the currency converter is only initialized when it is actually
used, and the used conversion list is the most up-to-date.
"""
def __init__(self, app: Flask):
self._converter = None
self._app = app
self._converter_updated = 0
self._dataset_updated = 0
def get_currency_converter(self) -> CurrencyConverter:
"""
Get a currency converter instance.
Automatically updates the currency converter if the dataset has been
updated.
Exceptions:
RuntimeError: If the currency file is not configured.
FileNotFoundError: If the currency file does not exist.
:return: A currency converter instance.
"""
if not (conversion_file := self._app.config.get('CURRENCY_FILE')):
raise RuntimeError('Currency file not configured.')
# Initialize the currency converter if it has not been initialized yet,
# or if the dataset has been updated.
self._dataset_updated = Path(conversion_file).stat().st_mtime
if self._converter is None or self._dataset_updated > self._converter_updated:
logger.info("Initializing currency converter with file %s.", conversion_file)
self._converter = CurrencyConverter(
currency_file=conversion_file,
ref_currency=REF_CURRENCY,
)
self._converter_updated = self._dataset_updated
return self._converter
def __getattr__(self, name):
"""
Proxy all other attributes to the currency converter.
"""
return getattr(self.get_currency_converter(), name)
def init_currency(app: Flask):
"""
Initialize the currency module.
This function initializes the currency module, and registers the currency
converter as an extension.
:param app: The Flask application.
:return: None
"""
# Set default currency file path
app.config.setdefault('CURRENCY_FILE', app.instance_path + '/currency.csv')
# Register the currency converter as an extension
app.extensions['currency_converter'] = CurrencyProxy(app)
# Register the currency converter as a template filter
app.add_template_filter(format_converted_currency, name='localcurrency')
app.cli.add_command(update_currency_rates)
def format_converted_currency(value, currency=None, **kwargs):
"""
Render a currency value in the preferred currency.
This function renders a currency value in the preferred currency for the
current locale. If the preferred currency is not the reference currency,
the value is converted to the preferred currency.
"""
if currency is None:
currency = get_preferred_currency()
# Convert the value to the preferred currency
local_value = convert_currency(value, currency)
# Format the value
html = render_template("money-tag.html",
base_amount=format_currency(value, currency=REF_CURRENCY, format_type='name', **kwargs),
local_amount=format_currency(local_value, currency=currency, **kwargs))
return Markup(html)
def convert_currency(value, currency=None, from_currency=REF_CURRENCY):
"""
Convert a currency value to the preferred currency.
This function converts a currency value to the preferred currency for the
current locale. If the preferred currency is not the reference currency,
the value is converted to the preferred currency.
"""
if currency != REF_CURRENCY:
return current_app.extensions['currency_converter'].convert(value, from_currency, currency)
return value
def convert_from_currency(value, currency) -> Decimal:
"""
Parses the localized currency value and converts it to the reference currency.
"""
locale = get_locale()
amount = parse_decimal(value, locale=locale)
if currency != REF_CURRENCY:
amount = Decimal(current_app.extensions['currency_converter'].convert(amount, currency, REF_CURRENCY))
return amount
def get_currencies():
"""
Get the list of supported currencies.
"""
return current_app.extensions['currency_converter'].currencies
def get_preferred_currency():
"""
Get the preferred currency.
This function returns the preferred currency for the current locale.
:return: The preferred currency.
"""
if current_user.is_authenticated and current_user.currency:
return str(current_user.currency)
# Fall back to the default currency for the locale
if territory := get_locale().territory:
currency = get_territory_currencies(territory)[0]
if currency in get_currencies():
return currency
else:
logger.warning("Default currency %s is not supported, falling back to %s.", currency, REF_CURRENCY)
return REF_CURRENCY
@click.command()
def update_currency_rates():
"""
Update currency file from the European Central Bank.
This command is meant to be run from the command line, and is not meant to be
used in the application:
$ flask update-currency-rates
:return: None
"""
click.echo('Updating currency file from the European Central Bank...')
fetch_currency_file()
click.echo('Done.')
def fetch_currency_file():
"""
Fetch the currency file from the European Central Bank.
This function fetches the currency file from the European Central Bank, and
stores it in the configured currency file path.
"""
from tempfile import NamedTemporaryFile # pylint: disable=import-outside-toplevel
fd, _ = urllib.request.urlretrieve(SINGLE_DAY_ECB_URL)
with ZipFile(fd) as zf:
file_name = zf.namelist().pop()
# Create a temporary file to store the currency file, to avoid corrupting
# the existing file if the download fails, or while writing the file.
file_path = os.path.dirname(current_app.config['CURRENCY_FILE'])
if not os.path.exists(file_path):
os.makedirs(file_path)
with NamedTemporaryFile(dir=file_path, delete=False) as f:
f.write(zf.read(file_name))
f.flush()
# Move the temporary file to the configured currency file path
os.rename(f.name, current_app.config['CURRENCY_FILE'])
import logging
from os import environ
from flask_mongoengine import MongoEngine
db = MongoEngine()
logger = logging.getLogger(__name__)
def init_db(app):
"""
Initialize the database connection.
Fetches the database connection string from the environment variable `MONGO_URL`
and, if present, sets the `MONGODB_SETTINGS` configuration variable to use it.
"""
# To keep secrets private, we use environment variables to store the database connection string.
# `MONGO_URL` is expected to be a valid MongoDB connection string, see: blah blah blah
mongodb_url = environ.get("MONGO_URL")
if mongodb_url is not None:
app.config["MONGODB_SETTINGS"] = {
"host": mongodb_url,
}
logger.info("Database connection string found, using it.",
# You can use the `extra` parameter to add extra information to the log message.
# This is useful for debugging, but should be removed in production.
extra={"MONGO_URL": mongodb_url} if app.debug else {})
else:
logger.warning("No database connection string found in env, using defaults.",
extra={"MONGODB_SETTINGS": app.config.get("MONGODB_SETTINGS")} if app.debug else {})
db.init_app(app)
"""
Internationalisation and localisation support for the application.
"""
from enum import Enum
import os
from typing import List
from flask_babel import Babel, get_locale as get_babel_locale
from babel import Locale
from babel import __version__ as babel_version
from flask import (
Flask,
g,
request,
session,
)
from werkzeug.datastructures import LanguageAccept
from flask_login import current_user
import logging
logger = logging.getLogger(__name__)
class SupportedLocales(Enum):
"""
Supported locales for the application.
The values are the locale identifiers used by the Babel library.
Order here determines the order in which the locales are presented to the
user, and the order in which the locales are tried when the user does not
specify a preferred locale.
"""
FI = "fi_FI.UTF-8"
"Finnish (Finland)"
SV = "sv_SE.UTF-8"
"Swedish (Sweden)"
EN = "en_GB.UTF-8"
"English (United Kingdom)"
# EN_US = "en_US.UTF-8"
# "English (United States)"
TLH = "tlh"
"Klingon"
TIMEZONES = {
"""
Timezones for supported locales.
The values are the timezone identifiers used by the Babel library.
This approach doesnt work for countries that have multiple timezones, like
the US.
"""
"fi_FI": "Europe/Helsinki",
"sv_SE": "Europe/Stockholm",
"en_GB": "Europe/London",
"tlh": "America/New_York",
}
def init_babel(flask_app: Flask):
"""
Initialize the Flask-Babel extension.
"""
# Monkeypatch klingon support into babel
# Klingon reverts to English
hack_babel_core_to_support_custom_locales({"tlh": "en"})
# Configure the Flask-Babel extension.
# Try setting the default locale from underlying OS. Falls back into English.
system_language = Locale.default().language
translation_dir = os.path.join(os.path.dirname(__file__), "translations")
flask_app.config.setdefault("BABEL_TRANSLATION_DIRECTORIES", translation_dir)
flask_app.config.setdefault("BABEL_DEFAULT_LOCALE", system_language)
babel = Babel(flask_app, locale_selector=get_locale, timezone_selector=get_timezone)
# Register `locales` as jinja variable to be used in templates. Uses the
# `Locale` class from the Babel library, so that the locale names can be
# translated.
locales = {}
for locale in SupportedLocales:
locales[locale.value] = Locale.parse(locale.value)
flask_app.jinja_env.globals.update(locales=locales)
# Register `get_locale` as jinja function to be used in templates
flask_app.jinja_env.globals.update(get_locale=get_babel_locale)
# If url contains locale parameter, set it as default in session
@flask_app.before_request
def set_locale():
if request.endpoint != "static":
if locale := request.args.get('locale'):
if locale in (str(l) for l in locales.values()):
logger.debug("Setting locale %s from URL.", locale)
session['locale'] = locale
else:
logger.warning("Locale %s not supported.", locale)
logger.info("Initialized Flask-Babel extension %s.", babel_version,
extra=flask_app.config.get_namespace("BABEL_"))
return babel
def hack_babel_core_to_support_custom_locales(custom_locales: dict):
""" Hack Babel core to make it support custom locale names
Based on : https://github.com/python-babel/babel/issues/454
Patch mechanism provided by @kolypto
Args:
custom_locales: Mapping from { custom name => ordinary name }
"""
from babel.core import get_global
# In order for Babel to know "en_CUSTOM", we have to hack its database and put our custom
# locale names there.
# This database is pickle-loaded from a .dat file and cached, so we only have to do it once.
db = get_global('likely_subtags')
for custom_name in custom_locales:
db[custom_name] = custom_name
# Also, monkey-patch the exists() and load() functions that load locale data from 'babel/locale-data'
import babel.localedata
# Originals
o_exists, o_load, o_parse_locale = babel.localedata.exists, babel.localedata.load, babel.core.parse_locale
# Definitions
def exists(name):
# Convert custom names to normalized names
name = custom_locales.get(name, name)
return o_exists(name)
def load(name, merge_inherited=True):
# Convert custom names to normalized names
original_name = custom_locales.get(name, name)
l_data = o_load(original_name, merge_inherited)
l_data['languages']['tlh'] = 'Klingon'
l_data.update({
'locale_id': name,
})
return l_data
# Definitions
def parse_locale(name, sep='_'):
# Convert custom names to normalized names
name = custom_locales.get(name, name)
l_data = o_parse_locale(name, sep)
return l_data
# Make sure we do not patch twice
if o_exists.__module__ != __name__:
babel.localedata.exists = exists
babel.localedata.load = load
# if o_parse_locale.__module__ != __name__:
# babel.core.parse_locale = parse_locale
# See that they actually exist
for normalized_name in custom_locales.values():
assert o_exists(normalized_name)
def get_locale():
"""
Get the locale for user.
Looks at the user model for the user's preferred locale. If the user has not
set a preferred locale, check the browser's Accept-Language header. If the
browser does not specify a preferred locale, use the default locale.
todo: What happens if the user's preferred locale support is dropped from
todo: the application?
:return: Suitable locale for the user.
"""
# if a locale was stored in the session, use that
if locale := session.get('locale'):
logger.debug("Setting locale %s from session.", locale)
return locale
# if a user is logged in, use the locale from the user settings
if current_user.is_authenticated and current_user.locale:
logger.debug("Using locale %s from user settings.", current_user.locale)
return current_user.locale
# otherwise try to guess the language from the user accept header the
# browser transmits.
# The Accept-Language header is a list of languages the user prefers,
# ordered by preference. The first language is the most preferred.
# The language is specified as a language tag, which is a combination of
# a language code and a country code, separated by a hyphen.
# For example, en-GB is English (United Kingdom).
# The language code is a two-letter code, and the country code is a
# two-letter code, or a three-digit number. The country code is optional.
# For example, en is English (no country specified), and en-US is English
# Convert the Enum of supported locales into a list of language tags.
# Fancy way: locales_to_try = [locale.value for locale in SupportedLocales]
locales_to_try: List[str] = list()
for locale in SupportedLocales:
locales_to_try.append(str(locale.value))
# Get the best match for the Accept-Language header.
locale = request.accept_languages.best_match(locales_to_try)
logger.debug("Best match for Accept-Language header (%s) is %s.",
request.accept_languages, locale)
return locale
def get_timezone():
"""
Get the timezone for user.
Looks at the user model for the user's preferred timezone. If the user has
not set a preferred timezone, use the default timezone.
"""
# if a user is logged in, use the timezone from the user settings
if current_user.is_authenticated and current_user.timezone:
logger.debug("Using locale %s from user settings.", current_user.timezone)
return current_user.timezone
# Try detecting the timezone from the user's locale.
locale = get_locale()
choises = [(k, 1) for k in TIMEZONES.keys()]
# Use the best_match method from the LanguageAccept class to get the best
# match for the user's locale.
best_match = LanguageAccept(choises).best_match([locale])
if best_match:
logger.debug("Guessing timezone %s from locale %s.", TIMEZONES[best_match], locale)
return TIMEZONES[best_match]
from datetime import datetime, timedelta
import logging
from typing import Optional
from flask import (
Blueprint, flash, redirect, render_template, request, url_for, jsonify, current_app
)
from flask_babel import _, get_locale, lazy_gettext
from werkzeug.exceptions import abort
from markupsafe import Markup
from .auth import login_required, current_user
from .models import Bid, Item
from .currency import (
convert_currency,
format_converted_currency,
convert_from_currency,
get_currencies,
get_preferred_currency,
REF_CURRENCY,
)
from .notification import send_notification
bp = Blueprint('items', __name__)
api = Blueprint('api_items', __name__, url_prefix='/api/items')
logger = logging.getLogger(__name__)
MIN_BID_INCREMENT = 1
def get_item(id):
try:
item = Item.objects.get_or_404(id=id)
except Exception as exc:
print("Error getting item:", exc)
abort(404)
if item.seller == current_user:
return item
abort(403)
def get_winning_bid(item: Item) -> Optional[Bid]:
"""
Return the (currently) winning bid for the given item.
If there are no bids, or the item is not yet closed, return None.
:param item: The item to get the winning bid for.
:return: The winning bid, or None.
"""
winning_bid = None
# If the item is closed, return the winning bid
if item.closed and item.winning_bid:
return item.winning_bid
# Sanity check: if the item is not closed, it should not have a winning bid
assert not item.closed or not (not item.closed and winning_bid), "Item is not closed, but has a winning bid"
try:
# Get the highest bid that was placed before the item closed
winning_bid = Bid.objects(item=item) \
.filter(created_at__lt=item.closes_at) \
.order_by('-amount') \
.first()
except Exception as exc:
logger.warning("Error getting winning bid: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
return winning_bid
def get_item_price(item: Item) -> int:
"""
Return the current price of the given item.
If there are no bids, return the starting bid.
:param item: The item to get the price for.
:return: The current price.
"""
winning_bid = get_winning_bid(item)
if winning_bid:
return winning_bid.amount + MIN_BID_INCREMENT
else:
return item.starting_bid
def handle_item_closing(item):
"""
Handle the closing of an item.
Checks if the item is not closed yet, but should be closed now. If so,
closes the item, and send notifications to the seller and the buyer.
:param item: The item to handle.
"""
# Handle the closing of an item
if not item.is_open and not item.closed:
logger.info("Closing item %r (%s)", item.title, item.id, extra={
'item_id': item.id,
'item_title': item.title,
'item_closes_at': item.closes_at,
})
# Get the winning bid
winning_bid = get_winning_bid(item)
if winning_bid:
item.winning_bid = winning_bid
# Send a notifications to the seller and the buyer
# lazy_gettext() is used to delay the translation until the message is sent
# Markup.escape() is used to escape strings, to prevent XSS attacks
send_notification(
item.seller,
title=lazy_gettext("Your item was sold"),
message=lazy_gettext("Your item <em>%(title)s</em> was sold to %(buyer)s for %(price)s.",
title=Markup.escape(item.title),
buyer=Markup.escape(winning_bid.bidder.email),
price=Markup.escape(winning_bid.amount)),
)
send_notification(
winning_bid.bidder,
title=lazy_gettext("You won an item"),
message=lazy_gettext("You won the item <em>%(title)s</em> for %(price)s.",
title=Markup.escape(item.title),
price=Markup.escape(winning_bid.amount)),
)
else:
# If there is no winning bid, send a notification to the seller
send_notification(
item.seller,
title=lazy_gettext("Your item was not sold"),
message=lazy_gettext("Your item <em>%(title)s</em> was not sold.",
title=Markup.escape(item.title)),
)
# Close the item
item.closed = True
item.save()
@bp.route("/", defaults={'page': 1})
@bp.route("/items/<int:page>")
def index(page=1):
"""
Index page for items on sale.
Lists only items that are currently sale, with pagination.
"""
# Function used on propaedeutic
# items = Item.objects.all()
# Fetch items that are on sale currently, and paginate
# See: http://docs.mongoengine.org/projects/flask-mongoengine/en/latest/custom_queryset.html
items = Item.objects.filter(closes_at__gt=datetime.utcnow()) \
.order_by('-closes_at') \
.paginate(page=page, per_page=10)
return render_template('items/index.html',
items=items)
@bp.route('/sell', methods=('GET', 'POST'))
@login_required
def sell():
if request.method == 'POST':
title = request.form['title']
description = request.form['description']
currency = request.form.get('currency', REF_CURRENCY)
starting_bid = convert_from_currency(request.form['starting_bid'], currency)
error = None
if not title:
error = 'Title is required.'
if not starting_bid or starting_bid < 1:
error = Markup(_("Starting bid must be greater than %(amount)s.", amount=format_converted_currency(1, currency)))
if error is None:
try:
sale_length = timedelta(days=1)
if current_app.config['DEBUG'] and request.form.get("flash-sale"):
sale_length = timedelta(seconds=20)
item = Item(
title=title,
description=description,
starting_bid=starting_bid,
seller=current_user,
closes_at=datetime.utcnow() + sale_length,
)
item.save()
flash(_('Item listed successfully!'))
except Exception as exc:
error = _("Error creating item: %(exc)s", exc=exc)
logger.warning("Error creating item: %s", exc, exc_info=True, extra={
'title': title,
'description': description,
'starting_bid': starting_bid,
})
else:
return redirect(url_for('items.index'))
print(error)
flash(error, category='error')
# Get the list of currencies, and map them to their localized names
currencies = {}
names = get_locale().currencies
for currency in get_currencies():
currencies[currency] = names.get(currency, currency)
return render_template('items/sell.html', currencies=currencies, default_currency=get_preferred_currency())
@bp.route('/item/<id>')
def view(id):
"""
Item view page.
Displays the item details, and a form to place a bid.
"""
item = Item.objects.get_or_404(id=id)
# !!! This is disabled as it might cause race conditions
# !!! if multiple users are accessing the same item at the same time
# Check if the item is closed, and handle it if so.
#handle_item_closing(item)
# Set the minumum price for the bid form from the current winning bid
winning_bid = get_winning_bid(item)
min_bid = get_item_price(item)
local_currency = get_preferred_currency()
local_min_bid = convert_currency(min_bid, local_currency)
if item.closes_at < datetime.utcnow():
if winning_bid and winning_bid.bidder == current_user:
flash(_("Congratulations! You won the auction!"), "success")
else:
flash(_("This item is no longer on sale."))
elif item.closes_at < datetime.utcnow() + timedelta(hours=1):
# Dark pattern to show enticing message to user
flash(_("This item is closing soon! Act now! Now! Now!"))
return render_template('items/view.html',
item=item, min_bid=min_bid,
local_min_bid=local_min_bid,
local_currency=local_currency)
@bp.route('/item/<id>/update', methods=('GET', 'POST'))
@login_required
def update(id):
item = get_item(id)
if request.method == 'POST':
title = request.form['title']
description = request.form['description']
error = None
if not title:
error = _('Title is required.')
try:
item.title = title
item.description = description
item.save()
except Exception as exc:
error = _("Error updating item: %(exc)s", exc=exc)
logger.warning("Error updating item: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
else:
flash(_("Item updated successfully!"))
return redirect(url_for('items.index'))
print(error)
flash(error, category='error')
return render_template('items/update.html', item=item)
@bp.route('/item/<id>/delete', methods=('POST',))
@login_required
def delete(id):
item = get_item(id)
try:
item.delete()
except Exception as exc:
logger.warning("Error deleting item: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
flash(_("Error deleting item: %(exc)s", exc=exc), category='error')
else:
flash(_("Item deleted successfully!"))
return redirect(url_for('items.index'))
@bp.route('/item/<id>/bid', methods=('POST',))
@login_required
def bid(id):
"""
Bid on an item.
If the bid is valid, create a new bid and redirect to the item view page.
Otherwise, display an error message and redirect back to the item view page.
:param id: The id of the item to bid on.
:return: A redirect to the item view page.
"""
item = Item.objects.get_or_404(id=id)
min_amount = get_item_price(item)
local_amount = request.form['amount']
currency = request.form.get('currency', REF_CURRENCY)
amount = convert_from_currency(local_amount, currency)
if amount < min_amount:
flash(_("Bid must be at least %(min_amount)s", min_amount=format_converted_currency(min_amount)))
return redirect(url_for('items.view', id=id))
if item.closes_at < datetime.utcnow():
flash("This item is no longer on sale.")
return redirect(url_for('items.view', id=id))
try:
# Notice: if you have integrated the flask-login extension, use current_user
# instead of g.user
bid = Bid(
item=item,
bidder=current_user,
amount=amount,
)
bid.save()
except Exception as exc:
flash(_("Error placing bid: %(exc)s", exc=exc))
else:
flash(_("Bid placed successfully!"))
return redirect(url_for('items.view', id=id))
@api.route('<id>/bids', methods=('GET',))
@login_required
def api_item_bids(id):
"""
Get the bids for an item.
:param id: The id of the item to get bids for.
:return: A JSON response containing the bids.
"""
item = Item.objects.get_or_404(id=id)
bids = []
for bid in Bid.objects(item=item).order_by('-amount'):
bids.append(bid.to_json())
return jsonify({
'success': True,
'bids': bids
})
@api.route('<id>/bids', methods=('POST',))
@login_required
def api_item_place_bid(id):
"""
Place a bid on an item.
If the bid is valid, create a new bid and return the bid.
Otherwise, return an error message.
Only accepts `REF_CURRENCY` bids.
:param id: The id of the item to bid on.
:return: A JSON response containing the bid.
"""
item = Item.objects.get_or_404(id=id)
min_amount = get_item_price(item)
try:
amount = int(request.form['amount'])
except KeyError:
return jsonify({
'success': False,
'error': _("Missing required argument %(argname)s", argname='amount')
})
except ValueError:
return jsonify({
'success': False,
'error': _("Invalid value for argument %(argname)s", argname='amount')
})
except Exception as exc:
return jsonify({
'success': False,
'error': _("Error parsing argument %(argname)s: %(exc)s", argname='amount', exc=exc)
})
if amount < min_amount:
return jsonify({
'success': False,
'error': _("Bid must be at least %(min_amount)s", min_amount=min_amount)
})
if item.closes_at < datetime.utcnow():
return jsonify({
'success': False,
'error': _("This item is no longer on sale.")
})
try:
bid = Bid(
item=item,
bidder=current_user,
amount=amount,
)
bid.save()
except Exception as exc:
logger.error("Error placing bid: %s", exc, exc_info=True, extra={
'item_id': item.id,
'bidder_id': current_user.id,
'amount': amount,
})
return jsonify({
'success': False,
'error': _("Error placing bid: %(exc)s", exc=exc)
})
return jsonify({
'success': True,
'bid': bid.to_mongo().to_dict()
})
"""
==============
Logging module
==============
In this module we'll create a new :class:`~Logger` interface, using pythons inbuild :module:`logging` module.
By default flask sends messages to stdout.
To use this module, import it into your application and call :func:`~init_logging` function:
>>> from tjts5901.logging import init_logging
>>> init_logging(app)
To use logging in your application, import the logger instance, and use it as follows:
>>> import logging
>>> logger = logging.getLogger(__name__)
>>> logger.info("Hello world!")
"""
import logging
from os import environ
import sentry_sdk
from flask import Flask
from flask.logging import default_handler as flask_handler
from sentry_sdk.integrations.flask import FlaskIntegration
from sentry_sdk.integrations.pymongo import PyMongoIntegration
from .utils import get_version
def init_logging(app: Flask):
"""
Integrate our own logging interface into application.
To bind logger into your application instance use::
>>> init_logging(app)
:param app: :class:`~Flask` instance to use as logging basis.
"""
# Setup own logger instance. Usually you'll see something like
# >>> logger = logging.getLogger(__name__)
# where `__name__` reflects the package name, which is usually `"__main__"`,
# or in this exact case `tjts5901.logging`. I'll rather define static name.
# To get access to your logger in outside of module scope you can then
# use the same syntax as follows.
logger = logging.getLogger("tjts5901")
# If flask is running in debug mode, set our own handler to log also debug
# messages.
if app.config.get("DEBUG"):
logger.setLevel(level=logging.DEBUG)
# Add flask default logging handler as one of our target handlers.
# When changes to flask logging handler is made, our logging handler
# adapts automatically. Logging pipeline:
# our appcode -> our logger -> flask handler -> ????
logger.addHandler(flask_handler)
logger.debug("TJTS5901 Logger initialised.")
# Try to get enviroment name from different sources
if enviroment := environ.get('CI_ENVIRONMENT_NAME'):
enviroment = enviroment.lower()
elif app.testing:
enviroment = "testing"
elif app.debug:
enviroment = "development"
# Populate config with environment variables for sentry logging
app.config.setdefault('SENTRY_DSN', environ.get('SENTRY_DSN'))
app.config.setdefault('SENTRY_ENVIRONMENT', enviroment)
app.config.setdefault('CI_COMMIT_SHA', environ.get('CI_COMMIT_SHA'))
# Setup sentry logging
sentry_dsn = app.config.get("SENTRY_DSN")
release = app.config.get("CI_COMMIT_SHA", get_version() or "dev")
enviroment = app.config.get("SENTRY_ENVIRONMENT", "production")
if sentry_dsn:
sentry = sentry_sdk.init(
dsn=sentry_dsn,
integrations=[
# Flask integration
FlaskIntegration(),
# Mongo integration. Mongoengine uses pymongo, so we need to
# integrate pymongo.
PyMongoIntegration(),
# Sentry will automatically pick up the logging module.
#LoggingIntegration(level=logging.INFO, event_level=logging.ERROR),
],
# Set traces_sample_rate to 1.0 to capture 100%
# of transactions for performance monitoring.
# We recommend adjusting this value in production.
traces_sample_rate=1.0,
# Set sentry debug mode to true if flask is running in debug mode.
#debug=bool(app.debug),
# By default the SDK will try to use the SENTRY_RELEASE
# environment variable, or infer a git commit
# SHA as release, however you may want to set
# something more human-readable.
release=release,
environment=enviroment,
)
app.config.setdefault("SENTRY_RELEASE", sentry._client.options["release"])
logger.info("Sentry logging enabled.", extra={"SENTRY_DSN": sentry_dsn})
else:
logger.warning("Sentry DSN not found. Sentry logging disabled.")
from datetime import datetime
from secrets import token_urlsafe
from urllib.parse import urlencode
from flask import url_for
from markupsafe import Markup
from .db import db
from mongoengine import (
StringField,
IntField,
ReferenceField,
DateTimeField,
EmailField,
BooleanField,
EnumField,
)
from mongoengine.queryset import CASCADE
from flask_login import UserMixin
from bson import ObjectId
from .i18n import SupportedLocales
class User(UserMixin, db.Document):
"""
Model representing a user of the auction site.
"""
id: ObjectId
email = EmailField(required=True, unique=True)
"The user's email address."
password = StringField(required=True)
locale = StringField(default=SupportedLocales.EN.value)
currency = StringField(max_length=3)
"The user's preferred currency."
timezone = StringField(max_length=50)
created_at = DateTimeField(required=True, default=datetime.utcnow)
is_disabled = BooleanField(default=False)
"Whether the user is disabled."
@property
def image_url(self) -> str:
"""
Return the URL of the user's avatar.
"""
import hashlib
digest = hashlib.md5(self.email.lower().encode("utf-8")).hexdigest()
default = url_for("static", filename="img/default-profile.png", _external=True)
params = urlencode({"s": 200, "d": default})
return f"https://www.gravatar.com/avatar/{digest}?{params}"
@property
def is_active(self) -> bool:
"""
Return whether the user is active.
This is used by Flask-Login to determine whether the user is
allowed to log in.
"""
return not self.is_disabled
def get_id(self) -> str:
"""
Return the user's id as a string.
"""
return str(self.id)
def __html__(self) -> str:
"""
Return a html representation of the user.
"""
url = url_for("auth.profile", email=self.email)
username = self.email.split("@")[0]
safe_username = Markup.escape(username)
return f"<a href=\"{url}\" class=\"profile-link\">@{safe_username}</a>"
class Item(db.Document):
"""
A model for items that are listed on the auction site.
"""
# Create index for sorting items by closing date
meta = {"indexes": [
{"fields": [
"closes_at",
]}
]}
title = StringField(max_length=100, required=True)
description = StringField(max_length=2000, required=True)
starting_bid = IntField(required=True, min_value=0)
seller = ReferenceField(User, required=True)
winning_bid = ReferenceField("Bid")
closed = BooleanField(default=False)
"Whether the item has been closed."
created_at = DateTimeField(required=True, default=datetime.utcnow)
closes_at = DateTimeField()
@property
def is_open(self) -> bool:
"""
Return whether the item is open for bidding.
"""
if self.closed:
return False
return self.closes_at > datetime.utcnow()
class Bid(db.Document):
"""
A model for bids on items.
"""
meta = {"indexes": [
{"fields": [
"amount",
"item",
"created_at",
]}
]}
amount = IntField(required=True, min_value=0)
"Indicates the value of the bid."
bidder = ReferenceField(User, required=True)
"User who placed the bid."
item = ReferenceField(Item, required=True)
"Item that the bid is for."
created_at = DateTimeField(required=True, default=datetime.utcnow)
"Date and time that the bid was placed."
class AccessToken(db.Document):
"""
Access token for a user.
This is used to authenticate API requests.
"""
meta = {"indexes": [
{"fields": [
"token",
"user",
"expires",
]}
]}
name = StringField(max_length=100, required=True)
"Human-readable name for the token."
user = ReferenceField(User, required=True, reverse_delete_rule=CASCADE)
"User that the token is for."
token = StringField(required=True, unique=True, default=token_urlsafe)
"The token string."
last_used_at = DateTimeField(required=False)
"Date and time that the token was last used."
created_at = DateTimeField(required=True, default=datetime.utcnow)
"Date and time that the token was created."
expires = DateTimeField(required=False)
"Date and time that the token expires."
class Notification(db.Document):
"""
Represents a message between two users, or a message to a user from
the system.
"""
meta = {"indexes": [
{"fields": [
"user",
"read_at",
"created_at",
]}
]}
id: ObjectId
user = ReferenceField(User, required=True, reverse_delete_rule=CASCADE)
category = StringField(max_length=100, default="message")
message = StringField(required=True)
title = StringField(max_length=120)
created_at = DateTimeField(required=True, default=datetime.utcnow)
read_at = DateTimeField(required=False)
"""
This module provides a way to send notifications to users.
"""
import dataclasses
from datetime import datetime
import logging
from flask import get_flashed_messages, jsonify, Blueprint
from flask_login import current_user, login_required
from flask_babel import force_locale, lazy_gettext
from .models import Notification, User
bp = Blueprint('notification', __name__, url_prefix='/')
logger = logging.getLogger(__name__)
@dataclasses.dataclass
class Message:
"""
Represents a message to be displayed to the user.
"""
message: str
category: str
# Additional field for the template
title: str = lazy_gettext("Message")
created_at: datetime = dataclasses.field(default_factory=datetime.utcnow)
def init_notification(app):
"""
Initialize the notifications module.
"""
app.register_blueprint(bp)
app.jinja_env.globals.update(get_notifications=get_notifications)
def send_notification(user, message, category="message", title=None):
"""
Send a notification to the given user.
:param user: The user to send the message to.
:param subject: The subject of the message.
:param message: The message to send.
"""
# Change the locale to the message recipient locale.
with force_locale(user.locale):
notification = Notification(
user=user,
message=str(message),
category=category,
title=str(title),
)
notification.save()
def get_notifications(user: User = current_user) -> list[Message]:
"""
Get the messages for the given user.
Flash messages are returned first, followed by database messages.
Messages are marked as read when they are retrieved.
Notice: the listing query might not return recently added messages.
:param user: The user to get the messages for.
:return: A list of messages.
"""
messages = []
# Get the flash messages first
for category, message in get_flashed_messages(with_categories=True):
messages.append(Message(message, category))
if user is None or not user.is_authenticated:
logger.debug("User is not authenticated, returning flash messages.")
return messages
# Get the database messages
notifications = Notification.objects(user=user, read_at=None).order_by('-created_at').all()
for notification in notifications:
messages.append(Message(notification.message, notification.category, notification.title))
# Mark the messages as read
notifications.update(read_at=datetime.utcnow())
return messages
@bp.route('/notifications.json', methods=('GET',))
@login_required
def user_notifications():
"""
Show the form to send a message to the given user.
"""
user = current_user
# Convert the notifications to a list of dictionaries
notifications = []
for notification in get_notifications(user):
notifications.append(dataclasses.asdict(notification))
return jsonify({
"success": True,
"notifications": notifications,
})
"""
This module contains the APScheduler extension.
This extension is used to schedule background tasks.
"""
from datetime import datetime, timedelta
import logging
from random import randint
from flask_apscheduler import APScheduler
from apscheduler.schedulers import SchedulerAlreadyRunningError
from mongoengine import signals, Q
from .models import Item
from .items import handle_item_closing
logger = logging.getLogger(__name__)
# Having the scheduler as a global variable is not ideal, but it's the easiest
# way to make it accessible.
scheduler = APScheduler()
def init_scheduler(app):
"""
Initialize the APScheduler extension.
This function is meant to be called from the create_app() function.
"""
try:
scheduler.init_app(app)
# Due to the scheduler being utilised as global variable, check if
# the scheduler is already running. If it is, then it means that the
# scheduler has already been initialised.
if not scheduler.running and not app.config.get('TESTING'):
# Add a signal handler to schedule a task to close the item when the auction
# ends.
signals.post_save.connect(_schedule_item_closing_task, sender=Item)
# Add a batch task to close expired bids every 15 minutes. This is to ensure
# that the bids are closed even if the server is restarted.
scheduler.add_job(trigger='interval', minutes=15,
func=_close_items,
id='close-items')
# Add a task to update the currency rates from the European Central Bank every
# day at random time between 5:00 and 5:59.
scheduler.add_job(trigger='cron', hour=5, minute=randint(0, 59),
func=_update_currency_rates,
id='update-currency-rates')
with app.app_context():
scheduler.start()
logger.debug('APScheduler started')
except SchedulerAlreadyRunningError:
logger.debug('APScheduler already running')
except Exception as exc:
logger.exception("Failed to initialize APScheduler: %s", exc)
return app
def _handle_item_closing(item_id):
"""
Handle the closing of an item.
This function is meant to be run by the APScheduler, and is not meant to be
called directly.
"""
with scheduler.app.app_context():
item = Item.objects.get(id=item_id)
handle_item_closing(item)
# Even as this is named function, it's used as a closure, so it can access
# the scheduler variable.
def _schedule_item_closing_task(sender, document, **kwargs): # pylint: disable=unused-argument
"""
Schedule a task to close the item when the auction ends.
This function is meant to be connected to the post_save signal of the Item
model.
"""
if not document.closes_at:
# The item does not have an auction end time, so there is no need to
# schedule a task to close it.
logger.debug("Not scheduling closing, as item %s does not have an auction end time", document.id)
return
if document.closed:
# The item is already closed, so there is no need to schedule a task to
# close it.
return
logger.debug('Scheduling task to close item %s', document.id)
scheduler.add_job(
func=_handle_item_closing,
args=(document.id,),
trigger='date',
run_date=document.closes_at + timedelta(seconds=1),
id=f'close-item-{document.id}',
)
def _close_items():
"""
Close expired bids.
This function is meant to be run by the APScheduler, and is not meant to be
called directly.
"""
with scheduler.app.app_context():
logger.info("Running scheduled task 'close-items'")
# Get items that are past the closing date, and are not already closed
closes_before = datetime.utcnow() + timedelta(seconds=2)
items = Item.objects(Q(closed=None) | Q(closed=False), closes_at__lt=closes_before).all()
logger.debug("Closing %d items", len(items))
# Close each item
for item in items:
try:
# Make sure item is not already closed
if item.closed:
continue
handle_item_closing(item)
except Exception as exc:
logger.error("Error closing items: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
def _update_currency_rates():
"""
Update the currency rates from the European Central Bank.
This function is meant to be run by the APScheduler, and is not meant to be
called directly.
"""
from .currency import fetch_currency_file
with scheduler.app.app_context():
logger.debug("Running scheduled task 'update-currency-rates'")
fetch_currency_file()
src/tjts5901/static/flags/en_GB.png

9.91 KiB

src/tjts5901/static/flags/fi_FI.png

5.77 KiB

src/tjts5901/static/flags/sv_SE.png

6.34 KiB

src/tjts5901/static/flags/tlh.png

7.75 KiB

File added
src/tjts5901/static/img/city.png

1.37 MiB

src/tjts5901/static/img/default-profile.png

77.1 KiB

/**
* Display message
*/
function showMessage(message, category="message", created_at=Date.now()) {
// Insert new toast
const html = document.querySelector("#message-toast").content.cloneNode(true);
html.classList += " " + category;
html.querySelector(".message").innerHTML = message;
html.querySelector("time.created-at").setAttribute("datetime", created_at);
ago = moment(created_at).fromNow();
html.querySelector("time.created-at").append(ago);
document.querySelector("#messages").append(html);
// Get the last inserted toast - the one we just appended
// and show it with bootsrap api
const toasts = document.querySelectorAll("#messages .toast");
const element = toasts[toasts.length-1];
let toast_options = {
'delay': 10000,
'autohide': false,
};
// Handle toast differenlty depending on category
switch(category) {
case "error":
element.classList += " bg-danger text-white"
toast_options['autohide'] = false;
break;
case "success":
element.classList += " bg-success text-white"
toast_options['autohide'] = true;
default:
break;
}
const toast = new bootstrap.Toast(element, toast_options);
toast.show();
}
/**
* When page is loaded, display notifications.
*/
window.addEventListener('load', function() {
// Populate notifications from the page first
let delay = 0;
notifications.forEach(msg => {
// Use delay as timeout to make them appear neatly.
setTimeout(() => showMessage(msg.message, msg.category, msg.created_at), delay += 150);
});
// Start timed loop to fetch new notifications from backend
setInterval(() => {
delay = 0;
// Fetch notifications from backend
fetch(NOTIFICATION_URL)
.then(response => response.json())
.then(data => {
data['notifications'].forEach(msg => {
setTimeout(() => showMessage(msg.message, msg.category, msg.created_at), delay += 150);
});
});
}, NOTIFICATION_WAIT_TIME);
})
/** Add style here **/
@font-face {
font-family: "Klingon";
src: url("fonts/klingon.ttf");
}
#locale-selector a {
background-repeat: no-repeat;
background-position: 2px 50%;
background-size: 18px 12px;
padding-left: 24px;
image-rendering: crisp-edges;
}
#locale-selector a[href$="locale=en_GB"] {
background-image: url('flags/en_GB.png');
}
#locale-selector a[href$="locale=fi_FI"] {
background-image: url('flags/fi_FI.png');
}
#locale-selector a[href$="locale=sv_SE"] {
background-image: url('flags/sv_SE.png');
}
#locale-selector a[href$="locale=tlh"] {
background-image: url('flags/tlh.png');
}
html[lang="tlh"] *, *[lang="tlh"] {
font-family: "Klingon", sans-serif !important;
}
This diff is collapsed.
File added
File added