PCv5/app/models/user.py

570 lines
19 KiB
Python
Raw Normal View History

from datetime import date
2018-09-29 23:51:45 +02:00
from flask_login import UserMixin
2019-12-22 14:58:19 +01:00
from sqlalchemy import func as SQLfunc
2019-12-09 23:24:05 +01:00
from os.path import isfile
from PIL import Image
from app import app, db
from app.models.priv import SpecialPrivilege, Group, GroupMember, \
GroupPrivilege
from app.models.trophy import Trophy, TrophyMember, Title
2019-09-01 12:30:41 +02:00
from app.models.notification import Notification
import app.utils.unicode_names as unicode_names
import app.utils.ldap as ldap
from app.utils.unicode_names import normalize
2019-12-09 23:24:05 +01:00
from config import V5Config
2018-11-01 21:01:05 +01:00
import werkzeug.security
import math
import app
2019-12-22 15:02:20 +01:00
import os
2021-07-23 23:28:18 +02:00
import json
2018-09-29 23:51:45 +02:00
2019-08-20 17:34:00 +02:00
2018-09-29 23:51:45 +02:00
class User(UserMixin, db.Model):
2021-02-20 19:30:18 +01:00
""" Any website user, logged in (Member) or not (Guest) """
2019-08-20 17:34:00 +02:00
2018-11-01 21:01:05 +01:00
__tablename__ = 'user'
2021-02-20 19:30:18 +01:00
# User ID, should be used to refer to any user. The actual user can either
# be a guest (with IP as key) or a member (with this ID as key).
2018-11-01 21:01:05 +01:00
id = db.Column(db.Integer, primary_key=True)
# User type (polymorphic discriminator)
type = db.Column(db.String(30))
# Other fields populated automatically through relations:
# <posts> relationship populated from the Post class.
# Minimum and maximum user name length
NAME_MINLEN = 3
NAME_MAXLEN = 32
2018-11-01 21:01:05 +01:00
__mapper_args__ = {
'polymorphic_identity': __tablename__,
'polymorphic_on': type
}
def __repr__(self):
return f'<User: #{self.id}>'
2018-09-29 23:51:45 +02:00
2019-08-20 17:34:00 +02:00
class Guest(User):
2019-08-20 17:34:00 +02:00
""" Unregistered user with minimal privileges """
__tablename__ = 'guest'
__mapper_args__ = {'polymorphic_identity': __tablename__}
# ID of the [User] entry
id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True)
# Reusable username, cannot be chosen as the name of a member
2021-02-20 19:30:18 +01:00
# but will be distinguished at rendering time if a member takes it later
name = db.Column(db.Unicode(User.NAME_MAXLEN))
def __init__(self, name):
self.name = name
def __repr__(self):
2021-07-12 17:47:24 +02:00
return f'<Guest: {self.name}>'
class Member(User):
2021-02-20 19:30:18 +01:00
""" Registered user with full access to the site's features """
2019-08-20 17:34:00 +02:00
2018-11-01 21:01:05 +01:00
__tablename__ = 'member'
__mapper_args__ = {'polymorphic_identity': __tablename__}
# Id of the [User] entry
id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True)
# Primary attributes (needed for the system to work)
name = db.Column(db.Unicode(User.NAME_MAXLEN), index=True)
norm = db.Column(db.Unicode(User.NAME_MAXLEN), index=True, unique=True)
email = db.Column(db.Unicode(120), index=True, unique=True)
2020-07-21 21:06:00 +02:00
email_confirmed = db.Column(db.Boolean)
2018-11-01 21:01:05 +01:00
password_hash = db.Column(db.String(255))
xp = db.Column(db.Integer)
2018-09-29 23:51:45 +02:00
register_date = db.Column(db.Date, default=date.today)
2018-11-01 21:01:05 +01:00
2019-12-22 14:58:19 +01:00
avatar_id = db.Column(db.Integer, default=0)
@property
def avatar(self):
2019-12-22 14:58:19 +01:00
return f'{self.id}_{self.avatar_id}.png'
@property
def level(self):
level = math.asinh(self.xp / 1000) * (100 / math.asinh(10))
return int(level), int(level * 100) % 100
# Groups and related privileges
groups = db.relationship('Group', secondary=GroupMember,
back_populates='members')
# Personal information, all optional
bio = db.Column(db.UnicodeText)
signature = db.Column(db.UnicodeText)
birthday = db.Column(db.Date)
2018-09-29 23:51:45 +02:00
# Displayed title, if set
title_id = db.Column(db.Integer, db.ForeignKey('title.id'), nullable=True)
title = db.relationship('Title', foreign_keys=title_id)
# Settings
newsletter = db.Column(db.Boolean, default=False)
2021-07-08 11:43:09 +02:00
theme = db.Column(db.Unicode(32))
# Relations
2019-06-06 23:24:14 +02:00
trophies = db.relationship('Trophy', secondary=TrophyMember,
back_populates='owners')
topics = db.relationship('Topic')
2020-08-06 21:05:49 +02:00
programs = db.relationship('Program')
comments = db.relationship('Comment')
2019-11-21 15:31:46 +01:00
# Other fields populated automatically through relations:
# <notifications> List of unseen notifications (of type Notification)
2021-02-19 22:07:31 +01:00
# <polls> Polls created by the member (of class Poll)
2019-11-21 15:31:46 +01:00
def __init__(self, name, email, password):
"""Register a new user."""
self.name = name
self.norm = unicode_names.normalize(name)
2018-09-29 23:51:45 +02:00
self.email = email
2020-07-21 21:06:00 +02:00
self.email_confirmed = not V5Config.ENABLE_EMAIL_CONFIRMATION
if not V5Config.USE_LDAP:
self.set_password(password)
# Workflow with LDAP enabled is User → Postgresql → LDAP → set password
self.xp = 0
2021-07-08 11:43:09 +02:00
self.theme = 'default_theme'
self.bio = ""
self.signature = ""
self.birthday = None
2021-07-23 23:28:18 +02:00
def to_json(self, admin=False):
"""Returns the json-reprensentation of a user
admin <bool>: if all values may be returned or just public ones """
if admin:
fields = ["name", "norm", "email", "email_confirmed", "xp",
"theme", "bio", "signature", "birthday"]
else:
fields = ["name", "norm", "xp", "bio", "signature"]
output = {i: self.__getattribute__(i) for i in fields}
return json.dumps(output)
def generate_guest_name(self):
"""Generates a unique guest name to transfer contents to."""
count = 0
while Guest.query.filter_by(name=f"{self.name}_{count}").first():
count += 1
return f"{self.name}_{count}"
def transfer_posts(self, other):
"""
Transfers all the posts to another user. This is generally used to
transfer ownership to a newly-created Guest before deleting an account.
"""
for t in self.topics:
t.author = other
db.session.add(t)
for p in self.programs:
p.author = other
db.session.add(p)
for c in self.comments:
c.author = other
db.session.add(c)
def delete_posts(self):
"""Deletes the user's posts."""
for t in self.topics:
t.delete()
for p in self.programs:
p.delete()
for c in self.comments:
c.delete()
def delete(self):
"""
Deletes the user, but not the posts; use either transfer_posts() or
delete_posts() before calling this.
"""
for sp in SpecialPrivilege.query.filter_by(mid=self.id).all():
db.session.delete(sp)
self.trophies = []
db.session.add(self)
db.session.commit()
db.session.delete(self)
2021-02-20 19:30:18 +01:00
# Privilege checks
def priv(self, priv):
"""Check whether the member has the specified privilege."""
if SpecialPrivilege.query.filter_by(mid=self.id, priv=priv).first():
return True
return db.session.query(Group, GroupPrivilege).filter(
Group.id.in_([g.id for g in self.groups]),
GroupPrivilege.gid==Group.id,
GroupPrivilege.priv==priv).first() is not None
def special_privileges(self):
"""List member's special privileges."""
sp = SpecialPrivilege.query.filter_by(mid=self.id).all()
return sorted(row.priv for row in sp)
review of privileges and forum permissions * Sorted privileges into categories, similar to the v4.3 style Added privilege check utilities: * Forum: is_news(), is_default_accessible() and is_default_postable() * Member: can_access_forum(), can_post_in_forum(), can_edit_post(), and can_delete_post() Unfortunately current_user is not a Guest when logged out, so one cannot usually write current_user.can_*() without checking for authentication first, so the checks are still somewhat verbose. Reviewed forum permissions; the following permission issues have been fixed (I have tested most but not all of them prior to fixing): * app/routes/forum/index.py: Users that were not meant to access a forum could still obtain a listing of the topics * app/routes/forum/topic.py: Users that were not meant to see topics could still read them by browsing the URL * app/routes/forum/topic.py: Authenticated users could post in any topic, including ones that they should not have access to * app/routes/posts/edit.py: Users with edit.posts (eg. mods) could edit and delete messages in forums they can't access (eg. creativecalc) * app/templates/account/user.html: Users with admin panel access would see account editing links they can't use (affects developers) * app/templates/base/navbar/forum.html: The "Forum" tab would list all forums including ones the user doesn't have access to * app/templates/forum/index.html: Users would see every single forum, including ones they can't access * app/template/widgets/thread.html: Anyone would see Edit/Delete links on every message, even though most were unusable Miscellaneous changes: * app/routes/forum/topic.py: Ordered comments by date as intended, which I assume worked by chance until now * Removed the old assets/privs.txt files which is now superseded by the list implemented in app/data/groups.yaml This commit changes group and forum information, run master.py with: @> forums update @> groups update
2021-02-26 18:29:25 +01:00
def can_access_forum(self, forum):
"""Whether this member can read the forum's contents."""
return forum.is_default_accessible() or \
self.priv(f"forum.access.{forum.prefix}")
def can_post_in_forum(self, forum):
"""Whether this member can post in the forum."""
return forum.is_default_postable() or \
(forum.is_news() and self.priv("forum.post-news")) or \
self.priv("forum.post.{forum.prefix}") or \
self.priv("forum.post-anywhere")
def can_access_post(self, post):
"""Whether this member can access the post's forum (if any)."""
if post.type == "comment" and post.thread.owner_topic:
return self.can_access_forum(post.thread.owner_post.forum)
# Posts from other types of content are all public
return True
def can_edit_post(self, post):
"""Whether this member can edit the post."""
return self.can_access_post(post) and \
((post.author == self) or self.priv("edit.posts"))
def can_delete_post(self, post):
"""Whether this member can delete the post."""
return self.can_access_post(post) and \
((post.author == self) or self.priv("delete.posts"))
def can_punish_post(self, post):
"""Whether this member can delete the post with penalty."""
return self.can_access_post(post) and self.priv("delete.posts")
def can_set_topcomment(self, comment):
"""Whether this member can designate the comment as top comment."""
if comment.type != "comment":
return False
post = comment.thread.owner_post
return self.can_edit_post(post) and (comment.author == post.author)
def update(self, **data):
"""
Update all or part of the user's metadata. The [data] dictionary
accepts the following keys:
"email" str User mail address
"email_confirmed" bool User mail address confirmed
"password" str Raw password
"bio" str Biograpy
"signature" str Post signature
"birthday" date Birthday date
"newsletter" bool Newsletter setting
"xp" int Experience points
"avatar" File Avatar image
2021-07-08 11:43:09 +02:00
"theme" str Name of theme file
For future compatibility, other attributes are silently ignored. None
values can be specified and are ignored.
It is the caller's responsibility to check that the request sender has
the right to change user names, password... otherwise this method will
turn out dangerous!
"""
data = {key: data[key] for key in data if data[key] is not None}
2018-09-29 23:51:45 +02:00
# TODO: verify good type of those args, think about the password mgt
# Beware of LDAP injections
if "name" in data:
self.name = data["name"]
self.norm = normalize(data["name"])
if "email" in data:
self.email = data["email"]
if V5Config.USE_LDAP:
ldap.set_email(self.norm, self.email)
if "password" in data:
self.set_password(data["password"])
if "bio" in data:
2019-02-06 10:20:47 +01:00
self.bio = data["bio"]
if "signature" in data:
self.signature = data["signature"]
if "birthday" in data:
self.birthday = data["birthday"]
if "newsletter" in data:
self.newsletter = data["newsletter"]
if "avatar" in data:
self.set_avatar(data["avatar"])
if "title" in data:
self.title = Title.query.get(data["title"])
2021-07-08 11:43:09 +02:00
if "theme" in data:
self.theme = data["theme"]
2019-02-06 13:12:03 +01:00
# For admins only
if "email_confirmed" in data:
self.email_confirmed = data["email_confirmed"]
2019-02-06 13:12:03 +01:00
if "xp" in data:
self.xp = data["xp"]
def set_avatar(self, avatar):
2019-12-22 15:02:20 +01:00
# Save old avatar filepath
old_avatar = os.path.join(V5Config.DATA_FOLDER, "avatars", self.avatar)
2019-12-22 15:02:20 +01:00
# Resize & convert image
size = 128, 128
im = Image.open(avatar)
im.thumbnail(size, Image.ANTIALIAS)
2019-12-22 15:02:20 +01:00
# Change avatar id
2019-12-22 14:58:19 +01:00
# TODO: verify concurrency behavior
current_id = db.session.query(SQLfunc.max(Member.avatar_id)).first()[0]
self.avatar_id = current_id + 1
db.session.merge(self)
db.session.commit()
2019-12-22 15:02:20 +01:00
# Save the new avatar
im.save(os.path.join(V5Config.DATA_FOLDER, "avatars", self.avatar),
'PNG')
# If nothing has failed, remove old one (allow failure to regularize
# exceptional situations like missing avatar or folder migration)
try:
os.remove(old_avatar)
except FileNotFoundError:
pass
def get_public_data(self):
""" Returns the public information of the member."""
return {
"name": self.name,
"xp": self.xp,
"register_date": self.register_date,
"bio": self.bio,
"signature": self.signature,
"birthday": self.birthday,
}
def add_xp(self, amount):
"""
Reward xp to a member. If [amount] is negative, the xp total of the
member will decrease, down to zero.
"""
self.xp = min(max(self.xp + amount, 0), 1000000000)
2018-09-29 23:51:45 +02:00
def set_password(self, password):
"""
Set the user's password. Check whether the request sender has the right
to do this!
"""
if V5Config.USE_LDAP:
ldap.set_password(self, password)
else:
self.password_hash = werkzeug.security.generate_password_hash(
password, method='pbkdf2:sha512', salt_length=10)
2018-09-29 23:51:45 +02:00
def check_password(self, password):
""" Compares password against member hash or LDAP record """
if V5Config.USE_LDAP:
return ldap.check_password(self, password)
else:
return werkzeug.security.check_password_hash(self.password_hash,
password)
2018-09-29 23:51:45 +02:00
2019-09-01 12:30:41 +02:00
def notify(self, message, href=None):
"""
Notify a user with a message.
An hyperlink can be added to redirect to the notification source
"""
2019-12-03 16:57:49 +01:00
return
2019-09-01 12:30:41 +02:00
n = Notification(self.id, message, href=href)
db.session.add(n)
db.session.commit()
def add_group(self, g):
"""
Add a group to the user.
Check wheter or not the request sender has the right to do this!
"""
if type(g) == int:
g = Group.query.get(g)
if type(g) == str:
g = Group.query.filter_by(name=g).first()
if g not in self.groups:
self.groups.append(g)
self.notify(f"Vous avez été ajouté au groupe '{g.name}'")
def del_group(self, g):
"""
Remove a group to the user.
Check wheter or not the request sender has the right to do this!
"""
if type(g) == int:
g = Group.query.get(g)
if type(g) == str:
g = Group.query.filter_by(name=g).first()
if g in self.groups:
self.groups.remove(g)
def add_trophy(self, t):
"""
Add a trophy to the current user.
Check whether the request sender has the right to do this!
"""
2019-08-09 23:20:53 +02:00
if type(t) == int:
t = Trophy.query.get(t)
if type(t) == str:
2019-08-09 23:20:53 +02:00
t = Trophy.query.filter_by(name=t).first()
if t not in self.trophies:
self.trophies.append(t)
2019-09-01 12:30:41 +02:00
self.notify(f"Vous avez débloqué le trophée '{t.name}'")
def del_trophy(self, t):
"""
Delete a trophy to the current user.
Check whether the request sender has the right to do this!
"""
if type(t) == int:
t = Trophy.query.get(t)
if type(t) == str:
2019-08-20 17:34:00 +02:00
t = Trophy.query.filter_by(name=t).first()
if t in self.trophies:
self.trophies.remove(t)
def update_trophies(self, context=None):
"""
Auto-update trophies for the current user. Please use one of the
following contexts when possible:
- new-post
- new-program
- new-tutorial
- new-test
- new-event-participation
- new-art
- on-program-tested
- on-program-rewarded
- on-login
- on-profile-update
"""
def progress(trophies, value):
"""Award or delete all trophies from a progressive category."""
for level in trophies:
if value >= level:
self.add_trophy(trophies[level])
else:
self.del_trophy(trophies[level])
if context in ["new-post","new-program","new-tutorial","new-test",None]:
post_count = len(self.posts)
levels = {
20: "Premiers mots",
500: "Beau parleur",
1500: "Plume infaillible",
5000: "Romancier émérite",
}
progress(levels, post_count)
if context in ["new-program", None]:
program_count = len(self.programs)
levels = {
5: "Programmeur du dimanche",
10: "Codeur invétéré",
20: "Je code donc je suis",
}
progress(levels, program_count)
if context in ["new-tutorial", None]:
# TODO: Number of tutorials by user
tutorial_count = 0
levels = {
5: "Pédagogue",
10: "Encyclopédie vivante",
25: "Guerrier du savoir",
}
progress(levels, tutorial_count)
if context in ["new-test", None]:
# TODO: Number of tests by user
test_count = 0
levels = {
5: "Testeur",
25: "Grand joueur",
100: "Hard tester",
}
progress(levels, test_count)
if context in ["new-event-participation", None]:
# TODO: Number of event participations by user
event_participations = 0
levels = {
1: "Participant",
5: "Concourant encore",
15: "Concurrent de l'extrême",
}
progress(levels, event_participations)
if context in ["new-art", None]:
# TODO: Number of art posts by user
art_count = 0
levels = {
5: "Dessinateur en herbe",
30: "Open pixel",
100: "Roi du pixel",
}
progress(levels, art_count)
if context in ["on-program-tested", None]:
# TODO: Number of "coups de coeur" of user
heart_count = 0
levels = {
5: "Bourreau des cœurs",
}
progress(levels, heart_count)
if context in ["on-program-rewarded", None]:
# TODO: Number of programs with labels
label_count = 0
levels = {
5: "Maître du code",
}
progress(levels, label_count)
if context in ["on-login", None]:
# Seniority-based trophies
age = date.today() - self.register_date
levels = {
30: "Initié",
365.25: "Aficionado",
365.25 * 2: "Veni, vidi, casii",
365.25 * 5: "Papy Casio",
365.25 * 10: "Vétéran mythique",
}
progress(levels, age.days)
# TODO: Trophy "actif"
if context in ["on-profile-update", None]:
2020-07-26 16:55:12 +02:00
if isfile(os.path.join(
V5Config.DATA_FOLDER, "avatars", self.avatar)):
2019-12-09 23:24:05 +01:00
self.add_trophy("Artiste")
else:
self.del_trophy("Artiste")
db.session.merge(self)
db.session.commit()
2018-09-29 23:51:45 +02:00
def __repr__(self):
return f'<Member: {self.name} ({self.norm})>'
2018-11-01 21:01:05 +01:00
@app.login.user_loader
2018-11-01 21:01:05 +01:00
def load_user(id):
return User.query.get(int(id))