PythonExtra/py/makecompresseddata.py

206 lines
6.6 KiB
Python

from __future__ import print_function
import collections
import re
import sys
import gzip
import zlib
_COMPRESSED_MARKER = 0xFF
def check_non_ascii(msg):
for c in msg:
if ord(c) >= 0x80:
print(
'Unable to generate compressed data: message "{}" contains a non-ascii character "{}".'.format(
msg, c
),
file=sys.stderr,
)
sys.exit(1)
# Replace <char><space> with <char | 0x80>.
# Trivial scheme to demo/test.
def space_compression(error_strings):
for line in error_strings:
check_non_ascii(line)
result = ""
for i in range(len(line)):
if i > 0 and line[i] == " ":
result = result[:-1]
result += "\\{:03o}".format(ord(line[i - 1]))
else:
result += line[i]
error_strings[line] = result
return None
# Replace common words with <0x80 | index>.
# Index is into a table of words stored as aaaaa<0x80|a>bbb<0x80|b>...
# Replaced words are assumed to have spaces either side to avoid having to store the spaces in the compressed strings.
def word_compression(error_strings):
topn = collections.Counter()
for line in error_strings.keys():
check_non_ascii(line)
for word in line.split(" "):
topn[word] += 1
# Order not just by frequency, but by expected saving. i.e. prefer a longer string that is used less frequently.
# Use the word itself for ties so that compression is deterministic.
def bytes_saved(item):
w, n = item
return -((len(w) + 1) * (n - 1)), w
top128 = sorted(topn.items(), key=bytes_saved)[:128]
index = [w for w, _ in top128]
index_lookup = {w: i for i, w in enumerate(index)}
for line in error_strings.keys():
result = ""
need_space = False
for word in line.split(" "):
if word in index_lookup:
result += "\\{:03o}".format(0b10000000 | index_lookup[word])
need_space = False
else:
if need_space:
result += " "
need_space = True
result += word
error_strings[line] = result.strip()
return "".join(w[:-1] + "\\{:03o}".format(0b10000000 | ord(w[-1])) for w in index)
# Replace chars in text with variable length bit sequence.
# For comparison only (the table is not emitted).
def huffman_compression(error_strings):
# https://github.com/tannewt/huffman
import huffman
all_strings = "".join(error_strings)
cb = huffman.codebook(collections.Counter(all_strings).items())
for line in error_strings:
b = "1"
for c in line:
b += cb[c]
n = len(b)
if n % 8 != 0:
n += 8 - (n % 8)
result = ""
for i in range(0, n, 8):
result += "\\{:03o}".format(int(b[i : i + 8], 2))
if len(result) > len(line) * 4:
result = line
error_strings[line] = result
# TODO: This would be the prefix lengths and the table ordering.
return "_" * (10 + len(cb))
# Replace common N-letter sequences with <0x80 | index>, where
# the common sequences are stored in a separate table.
# This isn't very useful, need a smarter way to find top-ngrams.
def ngram_compression(error_strings):
topn = collections.Counter()
N = 2
for line in error_strings.keys():
check_non_ascii(line)
if len(line) < N:
continue
for i in range(0, len(line) - N, N):
topn[line[i : i + N]] += 1
def bytes_saved(item):
w, n = item
return -(len(w) * (n - 1))
top128 = sorted(topn.items(), key=bytes_saved)[:128]
index = [w for w, _ in top128]
index_lookup = {w: i for i, w in enumerate(index)}
for line in error_strings.keys():
result = ""
for i in range(0, len(line) - N + 1, N):
word = line[i : i + N]
if word in index_lookup:
result += "\\{:03o}".format(0b10000000 | index_lookup[word])
else:
result += word
if len(line) % N != 0:
result += line[len(line) - len(line) % N :]
error_strings[line] = result.strip()
return "".join(index)
def main(collected_path, fn):
error_strings = collections.OrderedDict()
max_uncompressed_len = 0
num_uses = 0
# Read in all MP_ERROR_TEXT strings.
with open(collected_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
num_uses += 1
error_strings[line] = None
max_uncompressed_len = max(max_uncompressed_len, len(line))
# So that objexcept.c can figure out how big the buffer needs to be.
print("#define MP_MAX_UNCOMPRESSED_TEXT_LEN ({})".format(max_uncompressed_len))
# Run the compression.
compressed_data = fn(error_strings)
# Print the data table.
print('MP_COMPRESSED_DATA("{}")'.format(compressed_data))
# Print the replacements.
for uncomp, comp in error_strings.items():
if uncomp == comp:
prefix = ""
else:
prefix = "\\{:03o}".format(_COMPRESSED_MARKER)
print('MP_MATCH_COMPRESSED("{}", "{}{}")'.format(uncomp, prefix, comp))
# Used to calculate the "true" length of the (escaped) compressed strings.
def unescape(s):
return re.sub(r"\\\d\d\d", "!", s)
# Stats. Note this doesn't include the cost of the decompressor code.
uncomp_len = sum(len(s) + 1 for s in error_strings.keys())
comp_len = sum(1 + len(unescape(s)) + 1 for s in error_strings.values())
data_len = len(compressed_data) + 1 if compressed_data else 0
print("// Total input length: {}".format(uncomp_len))
print("// Total compressed length: {}".format(comp_len))
print("// Total data length: {}".format(data_len))
print("// Predicted saving: {}".format(uncomp_len - comp_len - data_len))
# Somewhat meaningless comparison to zlib/gzip.
all_input_bytes = "\\0".join(error_strings.keys()).encode()
print()
if hasattr(gzip, "compress"):
gzip_len = len(gzip.compress(all_input_bytes)) + num_uses * 4
print("// gzip length: {}".format(gzip_len))
print("// Percentage of gzip: {:.1f}%".format(100 * (comp_len + data_len) / gzip_len))
if hasattr(zlib, "compress"):
zlib_len = len(zlib.compress(all_input_bytes)) + num_uses * 4
print("// zlib length: {}".format(zlib_len))
print("// Percentage of zlib: {:.1f}%".format(100 * (comp_len + data_len) / zlib_len))
if __name__ == "__main__":
main(sys.argv[1], word_compression)