Forked from
Startuplab / Courses / TJTS5901 Continuous Software Engineering / TJTS5901 K23 Template
193 commits behind, 178 commits ahead of the upstream repository.
-
MikaelKrats authored
Fixed Conflicts: # src/tjts5901/templates/addItem.html # src/tjts5901/views.py
MikaelKrats authoredFixed Conflicts: # src/tjts5901/templates/addItem.html # src/tjts5901/views.py
views.py 12.00 KiB
"""
Basic views for Application
===========================
"""
import base64
import logging
from typing import Optional
from datetime import datetime, timedelta
from mongoengine.queryset.visitor import Q
from flask import Blueprint, flash, redirect, render_template, request, url_for, jsonify
from .models import Item, Bid
from .auth import login_required, current_user
from flask_babel import _, get_locale
from werkzeug.exceptions import abort
from markupsafe import Markup
from .auth import login_required, current_user
from .models import Bid, Item
from .currency import (
convert_currency,
format_converted_currency,
convert_from_currency,
get_currencies,
get_preferred_currency,
REF_CURRENCY,
)
# Main blueprint.
bp = Blueprint('views', __name__)
api = Blueprint('api', __name__, url_prefix='/api')
logger = logging.getLogger(__name__)
# Blueprint for documentation.
docs_bp = Blueprint('docs', __name__)
MIN_BID_INCREMENT = 1
def get_winning_bid(item: Item) -> Optional[Bid]:
"""
Return the (currently) winning bid for the given item.
If there are no bids, or the item is not yet closed, return None.
:param item: The item to get the winning bid for.
:return: The winning bid, or None.
"""
winning_bid = None
try:
winning_bid = Bid.objects(item=item) \
.filter(created_at__lt=item.closes_at) \
.order_by('-amount') \
.first()
except Exception as exc:
logger.warning("Error getting winning bid: %s", exc, exc_info=True, extra={
'item_id': item.id,
})
return winning_bid
def get_item_price(item: Item) -> int:
"""
Return the current price of the given item.
If there are no bids, return the starting bid.
:param item: The item to get the price for.
:return: The current price.
"""
winning_bid = get_winning_bid(item)
if winning_bid:
return winning_bid.amount
else:
return item.starting_bid
def check_auction_ends():
"""
Check if all previously not processed auctions have ended now
Get all closed items, check if they belong to the current user and inform about
the ending of the auction.
Then process all bids on these items and check if they won
"""
items = Item.objects.filter(Q(closes_at=datetime.utcnow()) & Q(closed_processed=False)) \
.order_by('-closes_at')
for item in items:
item.update(closed_processed = True)
message = None
if item.seller == current_user:
message = "The auction of your item \'{}\' has ended. ".format(item.title)
item.update(seller_informed_about_result = True)
# Get (potential) bids for the item and process them
bids = Bid.objects(item=item).order_by('-amount')
if len(bids) >= 1:
# Set winning_bid = True for winning bid
bids[0].update(winning_bid = True)
bids[0].update(auction_end_processed = True)
# Set winning_bid = False for all other bids
for bid in bids[1:]:
bid.update(winning_bid = False)
bid.update(auction_end_processed = True)
# Inform seller if this is current user
if item.seller == current_user:
message += "Congrats! It was sold for {}.".format(bids[0].amount)
# Inform current user if no one bid on the item
else:
if item.seller == current_user:
message += "No one bid on your item. Try again!"
#Display message if not None
if message is not None:
flash((message))
def check_bids_ended():
"""
Method for checking all bids of current user, marked as auction_end_processed
and inform bidder if neccessary
"""
bids = Bid.objects.filter(Q(bidder=current_user) & Q(auction_end_processed=True) & Q(bidder_informed=False))
if len(bids) >= 1:
for bid in bids:
if bid.winning_bid == True:
flash(("Congrats! You won the auction of '{}' with your bid over {}.".format(bid.item.title, bid.amount)))
else:
flash(("Pity! You were not successfull with the auction of '{}' with your bid over {}.".format(bid.item.title, bid.amount)))
bid.update(bidder_informed = True)
@bp.route("/test")
def test_item_adding():
"""
Test item is added
"""
#load test image
with open("src/tjts5901/test.jpeg", "rb") as image_file:
image = image_file.read()
# Create a new item
item = Item()
item.title = "Test title"
item.description = "This is a test description"
item.starting_bid = 100
item.seller = current_user
item.image = image
item.closes_at = datetime.utcnow() + timedelta(days=1)
item.save()
return "OK"
@bp.route("/addItem", methods=('GET', 'POST'))
@login_required
def add_item():
"""
AddItem page.
"""
if request.method == 'POST':
nItem = request.form.get('nItem')
description = request.form.get('description')
currency = request.form.get('currency', REF_CURRENCY)
starting_price = convert_from_currency(request.form['sPrice'], currency)
image = request.files['image']
duration = request.form.get('duration')
error = None
if not nItem:
error = 'Item name is required.'
if not starting_price or starting_price < 1:
error = Markup(_("Starting bid must be greater than %(amount)s.", amount=format_converted_currency(1, currency)))
if not duration:
error = 'Duration is required.'
print(duration, type(duration))
if error is None:
try:
item = Item(
title=nItem,
description=description,
starting_bid=starting_price,
seller=current_user,
closes_at=datetime.utcnow() + timedelta(hours=int(duration[:2]), minutes=int(duration[-2:])),
image=image
)
item.save()
flash(_('Item listed successfully!'))
except Exception as exc:
error = f"Error creating item: {exc!s}"
print('error:', exc)
else:
return redirect(url_for('views.list_bid'))
flash(error)
# Get the list of currencies, and map them to their localized names
currencies = {}
names = get_locale().currencies
for currency in get_currencies():
currencies[currency] = names.get(currency, currency)
item = render_template('addItem.html', currencies=currencies, default_currency=get_preferred_currency())
return item
@bp.route("/")
@bp.route("/listBid")
@login_required
def list_bid():
"""
page that lists all items currently on auction and checks for notification
"""
check_auction_ends()
check_bids_ended()
# only on sale items are shown
items = Item.objects.filter(closes_at__gt=datetime.utcnow()) \
.order_by('-closes_at')
for item in items:
item.image_base64 = base64.b64encode(item.image.read()).decode('utf-8')
item.created_at = item.created_at.strftime("%Y-%m-%d %H:%M:%S")
item.current_price = get_item_price(item)
html = render_template("listBid.html", items=items)
return html
@bp.route("item/<id>/bid", methods=('POST',))
@login_required
def bid(id):
"""
method that saves bid on object
If the bid is valid, create a new bid and redirect to the item view page.
Otherwise, display an error message and redirect back to the item view page.
"""
# frontend not implemented yet, therefore not active
item = Item.objects.get_or_404(id=id)
min_amount = item.starting_bid
local_currency = get_preferred_currency()
local_min_bid = convert_currency(min_amount, local_currency)
amount = int(request.form.get('bid'))
if amount <= min_amount:
flash(_("Bid must be at least %(min_amount)s", min_amount=format_converted_currency(min_amount)))
return redirect(url_for('views.list_bid'))
if item.closes_at < datetime.utcnow():
flash("This item is no longer on sale.")
return redirect(url_for('views.list_bid'))
if item.seller == current_user:
flash("You cannot bid on your own item. Bid was not saved.")
return redirect(url_for('views.list_bid'))
try:
# Notice: if you have integrated the flask-login extension, use current_user
# instead of g.user
bid = Bid(
item=item,
bidder=current_user,
amount=amount,
)
bid.save()
except Exception as exc:
flash(_("Error placing bid: %(exc)s", exc=exc))
else:
flash(("Bid placed successfully!"))
return redirect(url_for('views.list_bid', id=id))
@api.route('<id>/bids', methods=('GET',))
@login_required
def api_item_bids(id):
"""
Get the bids for an item.
:param id: The id of the item to get bids for.
:return: A JSON response containing the bids.
"""
item = Item.objects.get_or_404(id=id)
bids = []
for bid in Bid.objects(item=item).order_by('-amount'):
bids.append(bid.to_json())
return jsonify({
'success': True,
'bids': bids
})
@api.route('<id>/bids', methods=('POST',))
@login_required
def api_item_place_bid(id):
"""
Place a bid on an item.
If the bid is valid, create a new bid and return the bid.
Otherwise, return an error message.
Only accepts `REF_CURRENCY` bids.
:param id: The id of the item to bid on.
:return: A JSON response containing the bid.
"""
item = Item.objects.get_or_404(id=id)
min_amount = get_item_price(item) + MIN_BID_INCREMENT
try:
local_amount = request.form['amount']
currency = request.form.get('currency', REF_CURRENCY)
amount = convert_from_currency(local_amount, currency)
except KeyError:
return jsonify({
'success': False,
'error': ("Missing required argument %(argname)s", argname:='amount')
})
except ValueError:
return jsonify({
'success': False,
'error': ("Invalid value for argument %(argname)s", argname:='amount')
})
except Exception as exc:
return jsonify({
'success': False,
'error': ("Error parsing argument %(argname)s: %(exc)s", argname:='amount', exc:=exc)
})
if amount < min_amount:
return jsonify({
'success': False,
'error': ("Bid must be at least %(min_amount)s", min_amount:=min_amount)
})
if item.closes_at < datetime.utcnow():
return jsonify({
'success': False,
'error': ("This item is no longer on sale.")
})
try:
bid = Bid(
item=item,
bidder=current_user,
amount=amount,
)
bid.save()
except Exception as exc:
logger.error("Error placing bid: %s", exc, exc_info=True, extra={
'item_id': item.id,
'bidder_id': current_user.id,
'amount': amount,
})
return jsonify({
'success': False,
'error': ("Error placing bid: %(exc)s", exc:=exc)
})
print(type(bid))
return jsonify({
'success': True,
'bid': bid.to_json()
})
@bp.route("/view/<id>")
@login_required
def page_bid(id):
"""
Item view page.
Displays the item details, and a form to place a bid.
"""
item = Item.objects.get_or_404(id=id)
min_bid = get_item_price(item) + MIN_BID_INCREMENT
local_currency = get_preferred_currency()
local_min_bid = convert_currency(min_bid, local_currency)
item.image_base64 = base64.b64encode(item.image.read()).decode('utf-8')
item.created_at = item.created_at.strftime("%Y-%m-%d %H:%M:%S")
return render_template('view.html',
item=item, min_bid=min_bid,
local_min_bid=local_min_bid,
local_currency=local_currency)