from datetime import date from flask import flash from flask_login import UserMixin from sqlalchemy import func as SQLfunc from os.path import isfile from PIL import Image from app import app, db from app.models.privs import SpecialPrivilege, Group, GroupMember, \ GroupPrivilege from app.models.trophies import Trophy, TrophyMember from app.models.notification import Notification import app.utils.unicode_names as unicode_names from app.utils.notify import notify import app.utils.ldap as ldap from config import V5Config import werkzeug.security import re import math import app import os class User(UserMixin, db.Model): """ Website user that performs actions on the post """ __tablename__ = 'user' # User ID, should be used to refer to any user. Thea actual user can either # be a guest (with IP as key) or a member (with this ID as key). id = db.Column(db.Integer, primary_key=True) # User type (polymorphic discriminator) type = db.Column(db.String(30)) # Also a [posts] relationship populated from the Post class. __mapper_args__ = { 'polymorphic_identity': __tablename__, 'polymorphic_on': type } def __repr__(self): return f'' class Guest(User): """ Unregistered user with minimal privileges """ __tablename__ = 'guest' __mapper_args__ = {'polymorphic_identity': __tablename__} # ID of the [User] entry id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) # Reusable username, cannot be chosen as the name of a member # but will be distinguished at rendering time if a member take it later name = db.Column(db.Unicode(64)) def __init__(self, name): self.name = name def __repr__(self): return f'' class Member(User): """ Registered user with full access to the website's services """ __tablename__ = 'member' __mapper_args__ = {'polymorphic_identity': __tablename__} # Id of the [User] entry id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True) # Primary attributes (needed for the system to work) name = db.Column(db.Unicode(V5Config.USER_NAME_MAXLEN), index=True) norm = db.Column(db.Unicode(V5Config.USER_NAME_MAXLEN), index=True, unique=True) email = db.Column(db.Unicode(120), index=True, unique=True) email_confirmed = db.Column(db.Boolean) password_hash = db.Column(db.String(255)) xp = db.Column(db.Integer) register_date = db.Column(db.Date, default=date.today) avatar_id = db.Column(db.Integer, default=0) @property def avatar(self): return f'{self.id}_{self.avatar_id}.png' @property def level(self): level = math.asinh(self.xp / 1000) * (100 / math.asinh(10)) return int(level), int(level * 100) % 100 # Groups and related privileges groups = db.relationship('Group', secondary=GroupMember, back_populates='members') # Personal information, all optional bio = db.Column(db.UnicodeText) signature = db.Column(db.UnicodeText) birthday = db.Column(db.Date) # Settings newsletter = db.Column(db.Boolean, default=False) # Relations trophies = db.relationship('Trophy', secondary=TrophyMember, back_populates='owners') # Displayed title # title_id = db.Column(db.Integer, db.ForeignKey('title.id')) # title = db.relationship('Title', foreign_keys=title_id) # Other fields populated automatically through relations: # List of unseen notifications (of type Notification) def __init__(self, name, email, password): """Register a new user.""" self.name = name self.norm = unicode_names.normalize(name) self.email = email self.email_confirmed = not V5Config.ENABLE_EMAIL_CONFIRMATION if not V5Config.USE_LDAP: self.set_password(password) # Workflow with LDAP enabled is User → Postgresql → LDAP → set password self.xp = 0 self.bio = "" self.signature = "" self.birthday = None def delete(self): """ Deletes the user and the associated information: * Special privileges """ for sp in SpecialPrivilege.query.filter_by(mid=self.id).all(): db.session.delete(sp) db.session.commit() db.session.delete(self) db.session.commit() def priv(self, priv): """Check whether the member has the specified privilege.""" if SpecialPrivilege.query.filter_by(mid=self.id, priv=priv).first(): return True return db.session.query(Group, GroupPrivilege).filter( Group.id.in_([ g.id for g in self.groups ]), GroupPrivilege.gid==Group.id, GroupPrivilege.priv==priv).first() is not None def special_privileges(self): """List member's special privileges.""" sp = SpecialPrivilege.query.filter_by(mid=self.id).all() return sorted(row.priv for row in sp) def update(self, **data): """ Update all or part of the user's metadata. The [data] dictionary accepts the following keys: "email" str User mail ddress "password" str Raw password "bio" str Biograpy "signature" str Post signature "birthday" date Birthday date "newsletter" bool Newsletter setting "xp" int Experience points "avatar" File Avatar image For future compatibility, other attributes are silently ignored. None values can be specified and are ignored. It is the caller's responsibility to check that the request sender has the right to change user names, password... otherwise this method will turn out dangerous! """ data = {key: data[key] for key in data if data[key] is not None} # TODO: verify good type of those args, think about the password mgt # Beware of LDAP injections if "email" in data: self.email = data["email"] if V5Config.USE_LDAP: ldap.set_email(self.norm, self.email) if "password" in data: self.set_password(data["password"]) if "bio" in data: self.bio = data["bio"] if "signature" in data: self.signature = data["signature"] if "birthday" in data: self.birthday = data["birthday"] if "newsletter" in data: self.newsletter = data["newsletter"] if "avatar" in data: self.set_avatar(data["avatar"]) # For admins only if "xp" in data: self.xp = data["xp"] def set_avatar(self, avatar): # Save old avatar filepath old_avatar = V5Config.AVATARS_FOLDER + self.avatar # Resize & convert image size = 128, 128 im = Image.open(avatar) im.thumbnail(size, Image.ANTIALIAS) # Change avatar id # TODO: verify concurrency behavior current_id = db.session.query(SQLfunc.max(Member.avatar_id)).first()[0] self.avatar_id = current_id + 1 db.session.merge(self) db.session.commit() # Save the new avatar im.save(V5Config.AVATARS_FOLDER + self.avatar, 'PNG') # If nothing has failed, remove old one (allow failure to regularize # exceptional situations like missing avatar or folder migration) try: os.remove(old_avatar) except FileNotFoundError: pass def get_public_data(self): """ Returns the public information of the member.""" return { "name": self.name, "xp": self.xp, "register_date": self.register_date, "bio": self.bio, "signature": self.signature, "birthday": self.birthday, } def add_xp(self, amount): """ Reward xp to a member. If [amount] is negative, the xp total of the member will decrease, down to zero. """ self.xp = min(max(self.xp + amount, 0), 1000000000) def set_password(self, password): """ Set the user's password. Check whether the request sender has the right to do this! """ if V5Config.USE_LDAP: ldap.set_password(self, password) else: self.password_hash = werkzeug.security.generate_password_hash( password, method='pbkdf2:sha512', salt_length=10) def check_password(self, password): """ Compares password against member hash or LDAP record """ if V5Config.USE_LDAP: return ldap.check_password(self, password) else: return werkzeug.security.check_password_hash(self.password_hash, password) def notify(self, message, href=None): """ Notify a user with a message. An hyperlink can be added to redirect to the notification source """ return n = Notification(self.id, message, href=href) db.session.add(n) db.session.commit() def add_group(self, g): """ Add a group to the user. Check wheter or not the request sender has the right to do this! """ if type(g) == int: g = Group.query.get(g) if type(g) == str: g = Group.query.filter_by(name=g).first() if g not in self.groups: self.groups.append(g) self.notify(f"Vous avez été ajouté au groupe '{g.name}'") def del_group(self, g): """ Remove a group to the user. Check wheter or not the request sender has the right to do this! """ if type(g) == int: g = Group.query.get(g) if type(g) == str: g = Group.query.filter_by(name=g).first() if g in self.groups: self.groups.remove(g) def add_trophy(self, t): """ Add a trophy to the current user. Check whether the request sender has the right to do this! """ if type(t) == int: t = Trophy.query.get(t) if type(t) == str: t = Trophy.query.filter_by(name=t).first() if t not in self.trophies: self.trophies.append(t) self.notify(f"Vous avez débloqué le trophée '{t.name}'") def del_trophy(self, t): """ Delete a trophy to the current user. Check whether the request sender has the right to do this! """ if type(t) == int: t = Trophy.query.get(t) if type(t) == str: t = Trophy.query.filter_by(name=t).first() if t in self.trophies: self.trophies.remove(t) def update_trophies(self, context=None): """ Auto-update trophies for the current user. Please use one of the following contexts when possible: - new-post - new-program - new-tutorial - new-test - new-event-participation - new-art - on-program-tested - on-program-rewarded - on-login - on-profile-update """ def progress(trophies, value): """Award or delete all trophies from a progressive category.""" for level in trophies: if value >= level: self.add_trophy(trophies[level]) else: self.del_trophy(trophies[level]) if context in ["new-post", "new-program", "new-tutorial", "new-test", None]: # FIXME: Use ORM tools with careful, non-circular imports post_count = db.session.execute(f"""SELECT COUNT(*) FROM post INNER JOIN member ON member.id = post.author_id WHERE member.id = {self.id}""").first()[0] levels = { 20: "Premiers mots", 500: "Beau parleur", 1500: "Plume infaillible", 5000: "Romancier émérite", } progress(levels, post_count) if context in ["new-program", None]: # TODO: Amount of programs by the user program_count = 0 levels = { 5: "Programmeur du dimanche", 10: "Codeur invétéré", 20: "Je code donc je suis", } progress(levels, program_count) if context in ["new-tutorial", None]: # TODO: Number of tutorials by user tutorial_count = 0 levels = { 5: "Pédagogue", 10: "Encyclopédie vivante", 25: "Guerrier du savoir", } progress(levels, tutorial_count) if context in ["new-test", None]: # TODO: Number of tests by user test_count = 0 levels = { 5: "Testeur", 25: "Grand joueur", 100: "Hard tester", } progress(levels, test_count) if context in ["new-event-participation", None]: # TODO: Number of event participations by user event_participations = 0 levels = { 1: "Participant", 5: "Concourant encore", 15: "Concurrent de l'extrême", } progress(levels, event_participations) if context in ["new-art", None]: # TODO: Number of art posts by user art_count = 0 levels = { 5: "Dessinateur en herbe", 30: "Open pixel", 100: "Roi du pixel", } progress(levels, art_count) if context in ["on-program-tested", None]: # TODO: Number of "coups de coeur" of user heart_count = 0 levels = { 5: "Bourreau des cœurs", } progress(levels, heart_count) if context in ["on-program-rewarded", None]: # TODO: Number of programs with labels label_count = 0 levels = { 5: "Maître du code", } progress(levels, label_count) if context in ["on-login", None]: # Seniority-based trophies age = date.today() - self.register_date levels = { 30: "Initié", 365.25: "Aficionado", 365.25 * 2: "Veni, vidi, casii", 365.25 * 5: "Papy Casio", 365.25 * 10: "Vétéran mythique", } progress(levels, age.days) # TODO: Trophy "actif" if context in ["on-profile-update", None]: if isfile(V5Config.AVATARS_FOLDER + self.avatar): self.add_trophy("Artiste") else: self.del_trophy("Artiste") db.session.merge(self) db.session.commit() def __repr__(self): return f'' @app.login.user_loader def load_user(id): return User.query.get(int(id))