You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
346 lines
10 KiB
346 lines
10 KiB
#! /usr/bin/python3
|
|
|
|
from app import app, db
|
|
from app.models.user import Member, Group, GroupPrivilege
|
|
from app.models.priv import SpecialPrivilege
|
|
from app.models.trophy import Trophy, Title
|
|
from app.models.forum import Forum
|
|
from app.utils import unicode_names
|
|
import os
|
|
import sys
|
|
import yaml
|
|
import slugify
|
|
import readline
|
|
from PIL import Image
|
|
|
|
|
|
help_msg = """
|
|
This is the Planète Casio master shell. Type 'exit' or C-D to leave.
|
|
Type 'help' to print this message.
|
|
|
|
Listing commands:
|
|
members Show registered community members
|
|
groups Show privilege groups
|
|
forums Show forum tree
|
|
trophies Show trophies
|
|
|
|
Install and update commands:
|
|
update-groups Create or update groups from app/data/
|
|
update-forums Create or update the forum tree from app/data/
|
|
update-trophies Create or update trophies
|
|
generate-trophy-icons Regenerate all trophy icons
|
|
create-common-accounts Remove and recreate 'Planète Casio' and 'GLaDOS'
|
|
add-group <member> #<id> Add <member> to group #<id> (presumably admins)
|
|
enable-user <member> Manually confirm member's email address
|
|
"""
|
|
|
|
#
|
|
# Listing commands
|
|
#
|
|
|
|
def members(*args):
|
|
for m in Member.query.all():
|
|
print(m)
|
|
|
|
def groups(*args):
|
|
for g in Group.query.all():
|
|
print(f"#{g.id} {g.name}")
|
|
|
|
def forums(*args):
|
|
for f in Forum.query.all():
|
|
parent = f"in {f.parent.url}" if f.parent is not None else "root"
|
|
print(f"{f.url} ({parent}) [{f.prefix}]: {f.name}")
|
|
print(f" {f.descr}")
|
|
|
|
def trophies(*args):
|
|
for t in Trophy.query.all():
|
|
print(t)
|
|
|
|
#
|
|
# Install and update commands
|
|
#
|
|
|
|
def update_groups():
|
|
existing = Group.query.all()
|
|
|
|
gr = []
|
|
with open(os.path.join(app.root_path, "data", "groups.yaml")) as fp:
|
|
gr = yaml.safe_load(fp.read())
|
|
|
|
for group_info in gr:
|
|
name = group_info["name"]
|
|
css = group_info.get("css", "")
|
|
descr = group_info.get("descr", "")
|
|
privs = group_info.get("privs", "").split()
|
|
|
|
g = Group.query.filter_by(name=name).first()
|
|
|
|
# Update an existing group
|
|
if g is not None:
|
|
changes = (g.css != css) or (g.description != descr) or \
|
|
(set(g.privs()) != set(privs))
|
|
g.css = css
|
|
g.description = descr
|
|
|
|
for gpriv in GroupPrivilege.query.filter_by(gid=g.id):
|
|
db.session.delete(gpriv)
|
|
for priv in privs:
|
|
db.session.add(GroupPrivilege(g, priv))
|
|
|
|
if changes:
|
|
db.session.add(g)
|
|
print(f"[group] Updated {g.name}")
|
|
# Create a new one
|
|
else:
|
|
g = Group(name, css, descr)
|
|
db.session.add(g)
|
|
db.session.commit()
|
|
|
|
for priv in privs:
|
|
db.session.add(GroupPrivilege(g, priv))
|
|
|
|
print(f"[group] Created {g.name}")
|
|
|
|
db.session.commit()
|
|
|
|
|
|
def update_forums():
|
|
# Get current forums
|
|
existing = Forum.query.all()
|
|
|
|
# Get the list of forums we want to end up with
|
|
fr = []
|
|
success = 0
|
|
with open(os.path.join(app.root_path, "data", "forums.yaml")) as fp:
|
|
fr = yaml.safe_load(fp.read())
|
|
|
|
for url, info in fr.items():
|
|
if url == "/":
|
|
parent = None
|
|
else:
|
|
parent_url = url.rsplit('/', 1)[0]
|
|
if parent_url == "":
|
|
parent_url = "/"
|
|
parent = Forum.query.filter_by(url=parent_url).first()
|
|
|
|
if parent is None:
|
|
print(f"error: no parent with url {parent_url} for {url}")
|
|
continue
|
|
|
|
descr = info.get("descr", "")
|
|
|
|
# Either change an existing forum endpoint (same URL) or create one
|
|
f = Forum.query.filter_by(url=url).first()
|
|
if f is not None:
|
|
# No need to change the parent (same URL implies same parent)
|
|
changes = (f.name != info["name"]) or (f.prefix != info["prefix"])\
|
|
or (f.descr != descr)
|
|
f.name = info["name"]
|
|
f.prefix = info["prefix"]
|
|
f.descr = descr
|
|
if changes:
|
|
print(f"[forum] Updated {url}")
|
|
else:
|
|
f = Forum(url, info["name"], info["prefix"], descr, parent)
|
|
print(f"[forum] Created {url}")
|
|
|
|
db.session.add(f)
|
|
success += 1
|
|
|
|
# Remove old forums
|
|
for f in existing:
|
|
if f.url not in fr:
|
|
f.delete()
|
|
print(f"[forum] Removed {f.url}")
|
|
|
|
db.session.commit()
|
|
|
|
|
|
def update_trophies():
|
|
existing = Trophy.query.all()
|
|
|
|
# Get the list of what we want to obtain
|
|
tr = []
|
|
with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp:
|
|
tr = yaml.safe_load(fp.read())
|
|
tr = { t["name"]: t for t in tr }
|
|
|
|
# Remove trophies that we don't want or that we want as a different type
|
|
for t in existing:
|
|
if t.name not in tr or isinstance(t, Title) != tr[t.name]["is_title"]:
|
|
kind = "title" if isinstance(t, Title) else "trophy"
|
|
print(f"[trophies] Deleted '{t.name}' ({kind})")
|
|
db.session.delete(t)
|
|
db.session.commit()
|
|
|
|
# Add missing trophies
|
|
for name, t in tr.items():
|
|
description = t.get("description", "")
|
|
css = t.get("css", "")
|
|
trophy = Trophy.query.filter_by(name=name).first()
|
|
|
|
if "css" in t and not t["is_title"]:
|
|
print(f"[trophies] CSS on '{name}' is meaningless (not a title)")
|
|
|
|
# Updating existing trophies
|
|
if trophy is not None:
|
|
changes = (trophy.description != description) or \
|
|
(trophy.hidden != t["hidden"] or (isinstance(trophy,Title) and \
|
|
trophy.css != css))
|
|
trophy.description = description
|
|
trophy.hidden = t["hidden"]
|
|
if isinstance(trophy, Title):
|
|
trophy.css = css
|
|
if changes:
|
|
print(f"[trophies] Updated '{name}'")
|
|
|
|
# Add missing ones
|
|
elif t["is_title"]:
|
|
trophy = Title(name, description, t["hidden"], t.get("css", ""))
|
|
print(f"[trophies] Created '{name}' (title)")
|
|
else:
|
|
trophy = Trophy(name, description, t["hidden"])
|
|
print(f"[trophies] Created '{name}' (trophy)")
|
|
|
|
db.session.add(trophy)
|
|
|
|
db.session.commit()
|
|
|
|
|
|
def generate_trophy_icons():
|
|
tr = []
|
|
with open(os.path.join(app.root_path, "data", "trophies.yaml")) as fp:
|
|
tr = yaml.safe_load(fp.read())
|
|
|
|
names = [slugify.slugify(t["name"]) for t in tr]
|
|
src = os.path.join(app.root_path, "data", "trophies.png")
|
|
dst = os.path.join(app.root_path, "static", "images", "trophies")
|
|
|
|
try:
|
|
os.mkdir(dst)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
img = Image.open(src)
|
|
|
|
def trophy_iterator(img):
|
|
for y in range(img.height // 26):
|
|
for x in range(img.width // 26):
|
|
icon = img.crop((26*x+1, 26*y+1, 26*x+25, 26*y+25))
|
|
# Skip blank squares in the source image
|
|
if len(icon.getcolors()) > 1:
|
|
yield icon.resize((48,48), resample=Image.NEAREST)
|
|
|
|
for (name, icon) in zip(names, trophy_iterator(img)):
|
|
icon.save(os.path.join(dst, f"{name}.png"))
|
|
|
|
|
|
def create_common_accounts():
|
|
# Clean up common accounts
|
|
for name in "PlanèteCasio GLaDOS".split():
|
|
m = Member.query.filter_by(name=name).first()
|
|
if m is not None:
|
|
m.delete()
|
|
|
|
# Recreate theme
|
|
def addgroup(member, group):
|
|
g = Group.query.filter_by(name=group).first()
|
|
if g is not None:
|
|
member.groups.append(g)
|
|
|
|
m = Member("PlanèteCasio", "contact@planet-casio.com", "nologin")
|
|
addgroup(m, "Compte communautaire")
|
|
addgroup(m, "No login")
|
|
db.session.add(m)
|
|
|
|
m = Member("GLaDOS", "glados@aperture.science", "nologin")
|
|
m.xp = 1338
|
|
addgroup(m, "Robot")
|
|
addgroup(m, "No login")
|
|
db.session.add(m)
|
|
db.session.commit()
|
|
|
|
db.session.add(SpecialPrivilege(m, "edit.posts"))
|
|
db.session.add(SpecialPrivilege(m, "shoutbox.ban"))
|
|
|
|
db.session.commit()
|
|
|
|
|
|
def add_group(member, group):
|
|
if group[0] != "#":
|
|
print(f"error: group id {group} should start with '#'")
|
|
return
|
|
gid = int(group[1:])
|
|
|
|
norm = unicode_names.normalize(member)
|
|
|
|
g = Group.query.filter_by(id=gid).first()
|
|
m = Member.query.filter_by(norm=norm).first()
|
|
|
|
if m is None:
|
|
print(f"error: no member has a normalized name of '{norm}'")
|
|
return
|
|
|
|
m.groups.append(g)
|
|
db.session.add(m)
|
|
db.session.commit()
|
|
|
|
|
|
def enable_user(member):
|
|
norm = unicode_names.normalize(member)
|
|
m = Member.query.filter_by(norm=norm).first()
|
|
if m is None:
|
|
print(f"error: no member has a normalized name of '{norm}'")
|
|
return
|
|
|
|
m.email_confirmed = True
|
|
db.session.add(m)
|
|
db.session.commit()
|
|
|
|
#
|
|
# Main program
|
|
#
|
|
|
|
commands = {
|
|
"exit": lambda: sys.exit(0),
|
|
"help": lambda: print(help_msg),
|
|
"members": members,
|
|
"groups": groups,
|
|
"forums": forums,
|
|
"trophies": trophies,
|
|
"update-groups": update_groups,
|
|
"update-forums": update_forums,
|
|
"update-trophies": update_trophies,
|
|
"generate-trophy-icons": generate_trophy_icons,
|
|
"create-common-accounts": create_common_accounts,
|
|
"add-group": add_group,
|
|
"enable-user": enable_user,
|
|
}
|
|
|
|
def execute(cmd):
|
|
if cmd[0] not in commands:
|
|
print(f"error: unknown command '{cmd[0]}'")
|
|
else:
|
|
commands[cmd[0]](*cmd[1:])
|
|
|
|
# If a command is specified on the command-line, use it and do not prompt
|
|
if len(sys.argv) > 1:
|
|
execute(sys.argv[1:])
|
|
sys.exit(0)
|
|
# Otherwise, prompt interactively
|
|
else:
|
|
print(help_msg)
|
|
|
|
while True:
|
|
try:
|
|
cmd = input("@> ").split()
|
|
except EOFError:
|
|
print("^D")
|
|
sys.exit(0)
|
|
except KeyboardInterrupt:
|
|
print("^C")
|
|
sys.exit(0)
|
|
|
|
if cmd:
|
|
execute(cmd)
|