diff --git a/rebuild_db.py b/rebuild_db.py index 1ef9580..17f193e 100755 --- a/rebuild_db.py +++ b/rebuild_db.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -import functools # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or @@ -67,7 +66,9 @@ __email__ = "martin.fiedler@gmx.net" * initial public release, Win32 only """ -import sys, os, os.path, array, getopt, random, fnmatch, operator, string +import functools, sys, os, os.path, array, random, fnmatch, operator, string +from io import BytesIO +from typing import TextIO # @formatter:off KnownProps = ('filename', 'size', 'ignore', 'type', 'shuffle', 'reuse', 'bookmark') @@ -83,69 +84,43 @@ Rules = [ ] # @formatter:on -Options = { - "volume": None, - "interactive": False, - "smart": True, - "home": True, - "logging": True, - "reuse": 1, - "logfile": "rebuild_db.log.txt", - "rename": False -} domains = [] total_count = 0 KnownEntries = {} +logfile: TextIO | None = None +args = None + +iTunesSD_file: BytesIO | None = None + ################################################################################ def open_log(): global logfile - if Options['logging']: - try: - logfile = open(Options['logfile'], "w") - except IOError: - logfile = None - else: - logfile = None + if not args.nolog: + try: logfile = open(args.logfile, "w") + except IOError:logfile = None + else: logfile = None def log(line="", newline=True): global logfile - if newline: - print(line) - line += "\n" - else: - print(line, end=' ') - line += " " - if logfile: - try: - logfile.write(line) - except IOError: - pass + if newline: line += "\n" + else: line += " " + + print(line, end="") + if logfile: logfile.write(line) def close_log(): global logfile - if logfile: - logfile.close() - - -def go_home(): - if Options['home']: - try: - os.chdir(os.path.split(sys.argv[0])[0]) - except OSError: - pass - + if logfile: logfile.close() def filesize(filename): - try: - return os.stat(filename)[6] - except OSError: - return None + try: return os.stat(filename)[6] + except OSError: return None ################################################################################ @@ -156,25 +131,18 @@ def MatchRule(props, rule): prop, op, ref = props[rule[0]], rule[1], rule[2] except KeyError: return False - if rule[1] == '~': - return fnmatch.fnmatchcase(prop.lower(), ref.lower()) - elif rule[1] == '=': - return prop == ref - elif rule[1] == '>': - return prop > ref - elif rule[1] == '<': - return prop < ref - else: - return False + if rule[1] == '~': return fnmatch.fnmatchcase(prop.lower(), ref.lower()) + elif rule[1] == '=': return prop == ref + elif rule[1] == '>': return prop > ref + elif rule[1] == '<': return prop < ref + else: return False def ParseValue(val): if len(val) >= 2 and ((val[0] == "'" and val[-1] == "'") or (val[0] == '"' and val[-1] == '"')): return val[1:-1] - try: - return int(val) - except ValueError: - return val + try: return int(val) + except ValueError: return val def ParseRule(rule): @@ -182,14 +150,14 @@ def ParseRule(rule): prop = rule[:sep_pos].strip() if not prop in KnownProps: log("WARNING: unknown property `%s'" % prop) - return (prop, rule[sep_pos], ParseValue(rule[sep_pos + 1:].strip())) + return prop, rule[sep_pos], ParseValue(rule[sep_pos + 1:].strip()) def ParseAction(action): prop, value = list(map(str.strip, action.split('=', 1))) if not prop in KnownProps: log("WARNING: unknown property `%s'" % prop) - return (prop, ParseValue(value)) + return prop, ParseValue(value) def ParseRuleLine(line): @@ -202,9 +170,9 @@ def ParseRuleLine(line): ruleset = list(map(str.strip, ":".join(tmp[:-1]).split(","))) actions = dict(list(map(ParseAction, tmp[-1].split(",")))) if len(ruleset) == 1 and not (ruleset[0]): - return ([], actions) + return [], actions else: - return (list(map(ParseRule, ruleset)), actions) + return list(map(ParseRule, ruleset)), actions except OSError: # (ValueError,IndexError,KeyError): log("WARNING: rule `%s' is malformed, ignoring" % line) return None @@ -214,9 +182,8 @@ def ParseRuleLine(line): def safe_char(c): - if c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_": - return c - return "_" + if c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_": return c + else: return "_" def rename_safely(path, name): @@ -238,7 +205,7 @@ def rename_safely(path, name): def write_to_db(filename): - global iTunesSD, domains, total_count, KnownEntries, Rules + global iTunesSD_file, domains, total_count, KnownEntries, Rules # set default properties props = { @@ -247,7 +214,7 @@ def write_to_db(filename): 'ignore': 0, 'type': 1, 'shuffle': 1, - 'reuse': Options['reuse'], + 'reuse': not args.force, 'bookmark': 0 } @@ -266,7 +233,7 @@ def write_to_db(filename): "\0" * (525 - 2 * len(filename))).encode() # write entry, modifying shuffleflag and bookmarkflag at least - iTunesSD.write(entry[:555] + bytes([props['shuffle']]) + bytes([props['bookmark']]) + bytes([entry[557]])) + iTunesSD_file.write(entry[:555] + bytes([props['shuffle']]) + bytes([props['bookmark']]) + bytes([entry[557]])) if props['shuffle']: domains[-1].append(total_count) total_count += 1 return 1 @@ -281,21 +248,18 @@ def make_key(s): for j in range(i, len(s)): if not s[j].isdigit(): break if s[j].isdigit(): j += 1 - return (s[:i], int(s[i:j]), make_key(s[j:])) + return s[:i], int(s[i:j]), make_key(s[j:]) def key_repr(x): - if type(x) == tuple: - return b"%s%d%s" % (x[0], x[1], key_repr(x[2])) - else: - return x + if type(x) == tuple: return b"%s%d%s" % (x[0], x[1], key_repr(x[2])) + else: return x def cmp_key(a, b): if type(a) == tuple and type(b) == tuple: return cmp(a[0], b[0]) or cmp(a[1], b[1]) or cmp_key(a[2], b[2]) - else: - return cmp(key_repr(a), key_repr(b)) + else: return cmp(key_repr(a), key_repr(b)) def cmp(a, b): if a < b: return -1 @@ -306,7 +270,7 @@ def cmp(a, b): def file_entry(path, name, prefix=""): if not name or name[0] == ".": return None fullname = "%s/%s" % (path, name) - may_rename = not (fullname.startswith("./iPod_Control")) and Options['rename'] + may_rename = not (fullname.startswith("./iPod_Control")) and args.rename try: if os.path.islink(fullname): return None @@ -321,32 +285,26 @@ def file_entry(path, name, prefix=""): return None -def browse(path, interactive): +def browse(path: string, interactive: bool): global domains if path[-1] == "/": path = path[:-1] displaypath = path[1:] if not displaypath: displaypath = "/" - if interactive: - while 1: - try: - choice = input("include `%s'? [(Y)es, (N)o, (A)ll] " % displaypath)[:1].lower() - except EOFError: - raise KeyboardInterrupt - if not choice: continue - if choice in "at": # all/alle/tous/ - interactive = 0 - break - if choice in "yjos": # yes/ja/oui/si - break - if choice in "n": # no/nein/non/non? - return 0 + while interactive: + choice = input("include `%s'? [(Y)es, (N)o, (A)ll] " % displaypath)[:1].lower() + if not choice: continue - try: - files = [_f for _f in [file_entry(path, name) for name in os.listdir(path)] if _f] - except OSError: - return + # all/alle/tous/ + if choice in "at": interactive = 0 + # yes/ja/oui/si + if choice in "yjos": break + # no/nein/non/non? + if choice in "n": return + + try: files = [_f for _f in [file_entry(path, name) for name in os.listdir(path)] if _f] + except OSError: return if path == "./iPod_Control/Music": subdirs = [x[2] for x in files if not x[0]] @@ -354,10 +312,10 @@ def browse(path, interactive): for dir in subdirs: subpath = "%s/%s" % (path, dir) try: - files.extend( - [x for x in [file_entry(subpath, name, dir + "/") for name in os.listdir(subpath)] if x and x[0]]) - except OSError: - pass + files.extend([x for x in [ + file_entry(subpath, name, dir + "/") for name in os.listdir(subpath) + ] if x and x[0]]) + except OSError: pass files.sort(key = functools.cmp_to_key(cmp_key)) count = len([None for x in files if x[0]]) @@ -371,47 +329,43 @@ def browse(path, interactive): else: browse(fullname, interactive) - if real_count == count: - log("%s: %d files" % (displaypath, count)) - else: - log("%s: %d files (out of %d)" % (displaypath, real_count, count)) + log("%s: %d files (out of %d)" % (displaypath, real_count, count)) ################################################################################ -def stringval(i): +def stringval(i: int) -> bytes: if i < 0: i += 0x1000000 return b"%c%c%c" % (i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF) -def listval(i): +def listval(i: int) -> list[int]: if i < 0: i += 0x1000000 return [i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF] -def make_playback_state(volume=None): +def write_playback_state(): # I'm not at all proud of this function. Why can't stupid Python make strings # mutable?! log("Setting playback state ...", False) - PState = [] + p_state = [] try: f = open("iPod_Control/iTunes/iTunesPState", "rb") a = array.array('B') a.frombytes(f.read()) - PState = a.tolist() + p_state = a.tolist() f.close() except IOError: - del PState[:] + del p_state[:] #if len(PState) != 21: - # print("catstare") # PState = listval(29) + [0] * 15 + listval(1) # volume 29, FW ver 1.0 - PState[3:15] = [0] * 6 + [1] + [0] * 5 # track 0, shuffle mode, start of track - if volume is not None: - PState[:3] = listval(volume) + p_state[3:15] = [0] * 6 + [1] + [0] * 5 # track 0, shuffle mode, start of track + if args.volume is not None: + p_state[:3] = listval(args.volume) try: f = open("iPod_Control/iTunes/iTunesPState", "wb") - array.array('B', PState).tofile(f) + array.array('B', p_state).tofile(f) f.close() except IOError: log("FAILED.") @@ -420,7 +374,7 @@ def make_playback_state(volume=None): return 1 -def make_stats(count): +def write_stats(count): log("Creating statistics file ...", False) try: file = open("iPod_Control/iTunes/iTunesStats", "wb") @@ -437,10 +391,9 @@ def make_stats(count): def smart_shuffle(): - try: - slice_count = max(list(map(len, domains))) - except ValueError: - return [] + try: slice_count = max(list(map(len, domains))) + except ValueError: return [] + slices = [[] for x in range(slice_count)] slice_fill = [0] * slice_count @@ -451,7 +404,8 @@ def smart_shuffle(): # find slices where the nearest track of the same domain is far away metric = [ min([slice_count] + [min(abs(s - u), abs(s - u + slice_count), abs(s - u - slice_count)) for u in used]) - for s in range(slice_count)] + for s in range(slice_count) + ] thresh = (max(metric) + 1) // 2 farthest = [s for s in range(slice_count) if metric[s] >= thresh] @@ -461,7 +415,7 @@ def smart_shuffle(): # choose one of the remaining candidates and add the track to the chosen slice s = random.choice(emptiest or farthest) - slices[s].append((n, d)) + slices[s].append([n, d]) slice_fill[s] += 1 used.append(s) @@ -477,15 +431,15 @@ def smart_shuffle(): return seq -def make_shuffle(count): +def write_shuffle(count): random.seed() - if Options['smart']: - log("Generating smart shuffle sequence ...", False) - seq = smart_shuffle() - else: + if args.nosmart: log("Generating shuffle sequence ...", False) seq = list(range(count)) random.shuffle(seq) + else: + log("Generating smart shuffle sequence ...", False) + seq = smart_shuffle() try: with open("iPod_Control/iTunes/iTunesShuffle", "wb") as file: file.write(b"".join(map(stringval, seq))) @@ -500,7 +454,7 @@ def make_shuffle(count): def main(dirs): - global header, iTunesSD, total_count, KnownEntries, Rules + global header, iTunesSD_file, total_count, KnownEntries, Rules log("Welcome to %s, version %s" % (__title__, __version__)) log() @@ -519,29 +473,26 @@ Please make sure that: sys.exit(1) header = array.array('B') - iTunesSD = None try: - iTunesSD = open("iPod_Control/iTunes/iTunesSD", "rb") - header.fromfile(iTunesSD, 51) - if Options['reuse']: - iTunesSD.seek(18) - entry = iTunesSD.read(558) + iTunesSD_file = open("iPod_Control/iTunes/iTunesSD", "rb") + header.fromfile(iTunesSD_file, 51) + if not args.force: + iTunesSD_file.seek(18) + entry = iTunesSD_file.read(558) while len(entry) == 558: filename = entry[33::2].split(b"\0", 1)[0] KnownEntries[filename] = entry - entry = iTunesSD.read(558) + entry = iTunesSD_file.read(558) except (IOError, EOFError): pass - if iTunesSD: iTunesSD.close() + if iTunesSD_file: iTunesSD_file.close() if len(header) == 51: log("Using iTunesSD headers from existing database.") - if KnownEntries: - log("Collected %d entries from existing database." % len(KnownEntries)) + if KnownEntries: log("Collected %d entries from existing database." % len(KnownEntries)) else: del header[18:] - if len(header) == 18: - log("Using iTunesSD main header from existing database.") + if len(header) == 18: log("Using iTunesSD main header from existing database.") else: del header[:] log("Rebuilding iTunesSD main header from scratch.") @@ -551,8 +502,8 @@ Please make sure that: log() try: - iTunesSD = open("iPod_Control/iTunes/iTunesSD", "wb") - header[:18].tofile(iTunesSD) + iTunesSD_file = open("iPod_Control/iTunes/iTunesSD", "wb") + header[:18].tofile(iTunesSD_file) except IOError: log("""ERROR: Cannot write to the iPod database file (iTunesSD)! Please make sure that: @@ -563,25 +514,22 @@ Please make sure that: log("Searching for files on your iPod.") try: - if dirs: - for dir in dirs: - browse("./" + dir, Options['interactive']) - else: - browse(".", Options['interactive']) + for dir in dirs: + browse("./" + dir, args.interactive) log("%d playable files were found on your iPod." % total_count) log() log("Fixing iTunesSD header.") - iTunesSD.seek(0) - iTunesSD.write(b"\0%c%c" % (total_count >> 8, total_count & 0xFF)) - iTunesSD.close() + iTunesSD_file.seek(0) + iTunesSD_file.write(b"\0%c%c" % (total_count >> 8, total_count & 0xFF)) + iTunesSD_file.close() except IOError: log("ERROR: Some strange errors occured while writing iTunesSD.") log(" You may have to re-initialize the iPod using iTunes.") sys.exit(1) - if make_playback_state(Options['volume']) * \ - make_stats(total_count) * \ - make_shuffle(total_count): + if write_playback_state() * \ + write_stats(total_count) * \ + write_shuffle(total_count): log() log("The iPod shuffle database was rebuilt successfully.") log("Have fun listening to your music!") @@ -593,75 +541,29 @@ Please make sure that: ################################################################################ - -def help(): - print("Usage: %s [OPTION]... [DIRECTORY]..." % sys.argv[0]) - print("""Rebuild iPod shuffle database. - -Mandatory arguments to long options are mandatory for short options too. - -h, --help display this help text - -i, --interactive prompt before browsing each directory - -v, --volume=VOL set playback volume to a value between 0 and 38 - -s, --nosmart do not use smart shuffle - -n, --nochdir do not change directory to this scripts directory first - -l, --nolog do not create a log file - -f, --force always rebuild database entries, do not re-use old ones - -L, --logfile set log file name - -Must be called from the iPod's root directory. By default, the whole iPod is -searched for playable files, unless at least one DIRECTORY is specified.""") - - -def opterr(msg): - print("parse error:", msg) - print("use `%s -h' to get help" % sys.argv[0]) - sys.exit(1) - - -def parse_options(): - try: - opts, args = getopt.getopt(sys.argv[1:], "hiv:snlfL:r", \ - ["help", "interactive", "volume=", "nosmart", "nochdir", "nolog", "force", - "logfile=", "rename"]) - except getopt.GetoptError as message: - opterr(message) - sys.exit(1) - - for opt, arg in opts: - if opt in ("-h", "--help"): - help() - sys.exit(0) - elif opt in ("-i", "--interactive"): - Options['interactive'] = True - elif opt in ("-v", "--volume"): - try: - Options['volume'] = int(arg) - except ValueError: - opterr("invalid volume") - elif opt in ("-s", "--nosmart"): - Options['smart'] = False - elif opt in ("-n", "--nochdir"): - Options['home'] = False - elif opt in ("-l", "--nolog"): - Options['logging'] = False - elif opt in ("-f", "--force"): - Options['reuse'] = 0 - elif opt in ("-L", "--logfile"): - Options['logfile'] = arg - elif opt in ("-r", "--rename"): - Options['rename'] = True - return args - - -################################################################################ - - if __name__ == "__main__": - args = parse_options() - go_home() + import argparse + + parser = argparse.ArgumentParser(description="Rebuild iPod shuffle database.") + parser.add_argument('directories', nargs='*', default=["."]) + parser.add_argument("-i", "--interactive", action="store_true", help = "prompt before browsing each directory") + parser.add_argument("-s", "--nosmart", action="store_true", help = "do not use smart shuffle") + parser.add_argument("-n", "--nochdir", action="store_true", help = "do not change directory to this scripts directory first") + parser.add_argument("-l", "--nolog", action="store_true", help = "do not create a log file") + parser.add_argument("-f", "--force", action="store_true", help = "always rebuild database entries, do not re-use old ones") + parser.add_argument("-L", "--logfile", action="store", default="rebuild_db.log.txt", help = "set log file name") + parser.add_argument("-r", "--rename", action="store_true", help = "automatically rename files to safe names") + parser.add_argument("-v", "--volume", type=int, action="store", help = "set playback volume to a value between 0 and 38") + + args = parser.parse_args() + + if not args.nochdir: + try: os.chdir(os.path.abspath(__file__)) + except OSError: pass + open_log() try: - main(args) + main(args.directories) except KeyboardInterrupt: log() log("You decided to cancel processing. This is OK, but please note that")