PCv5/master.py

399 lines
12 KiB
Python
Executable File

#! /usr/bin/python3
from app import app, db
from app.models.user import Member, Group, GroupPrivilege
from app.models.priv import SpecialPrivilege
from app.models.trophy import Trophy, Title
from app.models.forum import Forum
from app.models.tag import TagInformation
from app.utils import unicode_names
import os
import sys
import yaml
import slugify
import readline
from PIL import Image
from PIL.Image import Resampling
help_msg = """
This is the Planète Casio master shell. Type 'exit' or C-D to leave.
Type 'help' to print this message.
Listing commands:
members Show registered community members
groups Show privilege groups
forums Show forum tree
trophies Show trophies
tags Show tags
Install and update commands:
update-all Create or update all assets
update-groups Create or update groups from app/data/
update-forums Create or update the forum tree from app/data/
update-trophies Create or update trophies
update-tags Create or update tag information
generate-trophy-icons Regenerate all trophy icons
create-common-accounts Remove and recreate 'Planète Casio' and 'GLaDOS'
add-group <member> #<id> Add <member> to group #<id> (presumably admins)
enable-user <member> Manually confirm member's email address
"""
#
# Listing commands
#
def members(*args):
for m in Member.query.all():
print(m)
def groups(*args):
for g in Group.query.all():
print(f"#{g.id} {g.name}")
def forums(*args):
for f in Forum.query.all():
parent = f"in {f.parent.url}" if f.parent is not None else "root"
print(f"{f.url} ({parent}) [{f.prefix}]: {f.name}")
print(f" {f.descr}")
def trophies(*args):
for t in Trophy.query.all():
print(t)
def tags(*args):
tags = TagInformation.query.all()
for t in sorted(tags, key=lambda t: t.id):
print(f"{t.id}: {t.pretty}")
#
# Install and update commands
#
def update_all():
update_groups()
update_forums()
update_forums()
update_trophies()
update_tags()
generate_trophy_icons()
def update_groups():
existing = Group.query.all()
gr = []
with open(os.path.join(app.root_path, "data", "groups.yaml")) as fp:
gr = yaml.safe_load(fp.read())
for group_info in gr:
name = group_info["name"]
css = group_info.get("css", "")
descr = group_info.get("descr", "")
privs = group_info.get("privs", "").split()
g = Group.query.filter_by(name=name).first()
# Update an existing group
if g is not None:
changes = (g.css != css) or (g.description != descr) or \
(set(g.privs()) != set(privs))
g.css = css
g.description = descr
for gpriv in g.privileges:
db.session.delete(gpriv)
for priv in privs:
db.session.add(GroupPrivilege(g, priv))
if changes:
db.session.add(g)
print(f"[group] Updated {g.name}")
# Create a new one
else:
g = Group(name, css, descr)
db.session.add(g)
db.session.commit()
for priv in privs:
db.session.add(GroupPrivilege(g, priv))
print(f"[group] Created {g.name}")
db.session.commit()
def update_forums():
# Get current forums
existing = Forum.query.all()
# Get the list of forums we want to end up with
fr = []
success = 0
with open(os.path.join(app.root_path, "data", "forums.yaml")) as fp:
fr = yaml.safe_load(fp.read())
for url, info in fr.items():
if url == "/":
parent = None
else:
parent_url = url.rsplit('/', 1)[0]
if parent_url == "":
parent_url = "/"
parent = Forum.query.filter_by(url=parent_url).first()
if parent is None:
print(f"error: no parent with url {parent_url} for {url}")
continue
descr = info.get("descr", "")
# Either change an existing forum endpoint (same URL) or create one
f = Forum.query.filter_by(url=url).first()
if f is not None:
# No need to change the parent (same URL implies same parent)
changes = (f.name != info["name"]) or (f.prefix != info["prefix"])\
or (f.descr != descr)
f.name = info["name"]
f.prefix = info["prefix"]
f.descr = descr
if changes:
print(f"[forum] Updated {url}")
else:
f = Forum(url, info["name"], info["prefix"], descr, parent)
print(f"[forum] Created {url}")
db.session.add(f)
success += 1
# Remove old forums
for f in existing:
if f.url not in fr:
f.delete()
print(f"[forum] Removed {f.url}")
db.session.commit()
def update_trophies():
existing = Trophy.query.all()
# Get the list of what we want to obtain
with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp:
tr = yaml.safe_load(fp.read())
tr = { t["name"]: t for t in tr }
# Remove trophies that we don't want or that we want as a different type
for t in existing:
if t.name not in tr or isinstance(t, Title) != tr[t.name]["is_title"]:
kind = "title" if isinstance(t, Title) else "trophy"
print(f"[trophies] Deleted '{t.name}' ({kind})")
db.session.delete(t)
db.session.commit()
# Add missing trophies
for name, t in tr.items():
description = t.get("description", "")
css = t.get("css", "")
trophy = Trophy.query.filter_by(name=name).first()
if "css" in t and not t["is_title"]:
print(f"[trophies] CSS on '{name}' is meaningless (not a title)")
# Updating existing trophies
if trophy is not None:
changes = (trophy.description != description) or \
(trophy.hidden != t["hidden"] or (isinstance(trophy,Title) and \
trophy.css != css))
trophy.description = description
trophy.hidden = t["hidden"]
if isinstance(trophy, Title):
trophy.css = css
if changes:
print(f"[trophies] Updated '{name}'")
# Add missing ones
elif t["is_title"]:
trophy = Title(name, description, t["hidden"], t.get("css", ""))
print(f"[trophies] Created '{name}' (title)")
else:
trophy = Trophy(name, description, t["hidden"])
print(f"[trophies] Created '{name}' (trophy)")
db.session.add(trophy)
db.session.commit()
def update_tags():
existing = TagInformation.query.all()
with open(os.path.join(app.root_path, "data", "tags.yaml")) as fp:
data = yaml.safe_load(fp.read())
tags = { ctgy + "." + name: data[ctgy][name]
for ctgy in data for name in data[ctgy] }
# Remove bad tags
for t in existing:
if t.id not in tags:
print(f"[tags] Deleted '{t.id}'")
db.session.delete(t)
db.session.commit()
for name, info in tags.items():
pretty = info.get("pretty", name)
t = TagInformation.query.filter_by(id=name).first()
if t is not None:
changes = (t.pretty != pretty)
t.pretty = pretty
if changes:
print(f"[tags] Updated '{name}'")
else:
t = TagInformation(name)
t.pretty = pretty
print(f"[tags] Created '{name}'")
db.session.add(t)
db.session.commit()
def generate_trophy_icons():
tr = []
with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp:
tr = yaml.safe_load(fp.read())
names = [slugify.slugify(t["name"]) for t in tr]
src = os.path.join(app.root_path, "data", "trophies.png")
dst = os.path.join(app.root_path, "static", "images", "trophies")
try:
os.mkdir(dst)
except FileExistsError:
pass
img = Image.open(src)
def trophy_iterator(img):
for y in range(img.height // 26):
for x in range(img.width // 26):
icon = img.crop((26*x+1, 26*y+1, 26*x+25, 26*y+25))
# Skip blank squares in the source image
if len(icon.getcolors()) > 1:
yield icon.resize((48,48), resample=Resampling.NEAREST)
for (name, icon) in zip(names, trophy_iterator(img)):
icon.save(os.path.join(dst, f"{name}.png"))
def create_common_accounts():
# Clean up common accounts
for name in "PlanèteCasio GLaDOS".split():
m = Member.query.filter_by(name=name).first()
if m is not None:
m.delete()
# Recreate theme
def addgroup(member, group):
g = Group.query.filter_by(name=group).first()
if g is not None:
member.groups.append(g)
m = Member("PlanèteCasio", "contact@planet-casio.com", "nologin")
addgroup(m, "Compte communautaire")
addgroup(m, "No login")
db.session.add(m)
m = Member("GLaDOS", "glados@aperture.science", "nologin")
m.xp = 1338
addgroup(m, "Robot")
addgroup(m, "No login")
db.session.add(m)
db.session.commit()
db.session.add(SpecialPrivilege(m, "edit.posts"))
db.session.add(SpecialPrivilege(m, "shoutbox.ban"))
db.session.commit()
def add_group(member, group):
if group[0] != "#":
print(f"error: group id {group} should start with '#'")
return
gid = int(group[1:])
norm = unicode_names.normalize(member)
g = Group.query.filter_by(id=gid).first()
m = Member.query.filter_by(norm=norm).first()
if m is None:
print(f"error: no member has a normalized name of '{norm}'")
return
m.groups.append(g)
db.session.add(m)
db.session.commit()
def enable_user(member):
norm = unicode_names.normalize(member)
m = Member.query.filter_by(norm=norm).first()
if m is None:
print(f"error: no member has a normalized name of '{norm}'")
return
m.email_confirmed = True
db.session.add(m)
db.session.commit()
#
# Main program
#
commands = {
"exit": lambda: sys.exit(0),
"help": lambda: print(help_msg),
"members": members,
"groups": groups,
"forums": forums,
"trophies": trophies,
"tags": tags,
"update-all": update_all,
"update-groups": update_groups,
"update-forums": update_forums,
"update-trophies": update_trophies,
"update-tags": update_tags,
"generate-trophy-icons": generate_trophy_icons,
"create-common-accounts": create_common_accounts,
"add-group": add_group,
"enable-user": enable_user,
}
def execute(cmd):
if cmd[0] not in commands:
print(f"error: unknown command '{cmd[0]}'")
else:
commands[cmd[0]](*cmd[1:])
# If a command is specified on the command-line, use it and do not prompt
if len(sys.argv) > 1:
execute(sys.argv[1:])
sys.exit(0)
# Otherwise, prompt interactively
else:
print(help_msg)
while True:
try:
cmd = input("@> ").split()
except EOFError:
print("^D")
sys.exit(0)
except KeyboardInterrupt:
print("^C")
sys.exit(0)
if cmd:
execute(cmd)