use argparse + some minor cleanups

This commit is contained in:
lynxize 2025-06-20 16:17:38 -06:00
parent 503dd069c8
commit 141f6bdeb1
Signed by: lynxize
GPG key ID: 8615849B8532CD77

View file

@ -1,5 +1,4 @@
#!/usr/bin/env python
import functools
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
@ -67,7 +66,9 @@ __email__ = "martin.fiedler@gmx.net"
* initial public release, Win32 only
"""
import sys, os, os.path, array, getopt, random, fnmatch, operator, string
import functools, sys, os, os.path, array, random, fnmatch, operator, string
from io import BytesIO
from typing import TextIO
# @formatter:off
KnownProps = ('filename', 'size', 'ignore', 'type', 'shuffle', 'reuse', 'bookmark')
@ -83,69 +84,43 @@ Rules = [
]
# @formatter:on
Options = {
"volume": None,
"interactive": False,
"smart": True,
"home": True,
"logging": True,
"reuse": 1,
"logfile": "rebuild_db.log.txt",
"rename": False
}
domains = []
total_count = 0
KnownEntries = {}
logfile: TextIO | None = None
args = None
iTunesSD_file: BytesIO | None = None
################################################################################
def open_log():
global logfile
if Options['logging']:
try:
logfile = open(Options['logfile'], "w")
except IOError:
logfile = None
else:
logfile = None
if not args.nolog:
try: logfile = open(args.logfile, "w")
except IOError:logfile = None
else: logfile = None
def log(line="", newline=True):
global logfile
if newline:
print(line)
line += "\n"
else:
print(line, end=' ')
line += " "
if logfile:
try:
logfile.write(line)
except IOError:
pass
if newline: line += "\n"
else: line += " "
print(line, end="")
if logfile: logfile.write(line)
def close_log():
global logfile
if logfile:
logfile.close()
def go_home():
if Options['home']:
try:
os.chdir(os.path.split(sys.argv[0])[0])
except OSError:
pass
if logfile: logfile.close()
def filesize(filename):
try:
return os.stat(filename)[6]
except OSError:
return None
try: return os.stat(filename)[6]
except OSError: return None
################################################################################
@ -156,25 +131,18 @@ def MatchRule(props, rule):
prop, op, ref = props[rule[0]], rule[1], rule[2]
except KeyError:
return False
if rule[1] == '~':
return fnmatch.fnmatchcase(prop.lower(), ref.lower())
elif rule[1] == '=':
return prop == ref
elif rule[1] == '>':
return prop > ref
elif rule[1] == '<':
return prop < ref
else:
return False
if rule[1] == '~': return fnmatch.fnmatchcase(prop.lower(), ref.lower())
elif rule[1] == '=': return prop == ref
elif rule[1] == '>': return prop > ref
elif rule[1] == '<': return prop < ref
else: return False
def ParseValue(val):
if len(val) >= 2 and ((val[0] == "'" and val[-1] == "'") or (val[0] == '"' and val[-1] == '"')):
return val[1:-1]
try:
return int(val)
except ValueError:
return val
try: return int(val)
except ValueError: return val
def ParseRule(rule):
@ -182,14 +150,14 @@ def ParseRule(rule):
prop = rule[:sep_pos].strip()
if not prop in KnownProps:
log("WARNING: unknown property `%s'" % prop)
return (prop, rule[sep_pos], ParseValue(rule[sep_pos + 1:].strip()))
return prop, rule[sep_pos], ParseValue(rule[sep_pos + 1:].strip())
def ParseAction(action):
prop, value = list(map(str.strip, action.split('=', 1)))
if not prop in KnownProps:
log("WARNING: unknown property `%s'" % prop)
return (prop, ParseValue(value))
return prop, ParseValue(value)
def ParseRuleLine(line):
@ -202,9 +170,9 @@ def ParseRuleLine(line):
ruleset = list(map(str.strip, ":".join(tmp[:-1]).split(",")))
actions = dict(list(map(ParseAction, tmp[-1].split(","))))
if len(ruleset) == 1 and not (ruleset[0]):
return ([], actions)
return [], actions
else:
return (list(map(ParseRule, ruleset)), actions)
return list(map(ParseRule, ruleset)), actions
except OSError: # (ValueError,IndexError,KeyError):
log("WARNING: rule `%s' is malformed, ignoring" % line)
return None
@ -214,9 +182,8 @@ def ParseRuleLine(line):
def safe_char(c):
if c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_":
return c
return "_"
if c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_": return c
else: return "_"
def rename_safely(path, name):
@ -238,7 +205,7 @@ def rename_safely(path, name):
def write_to_db(filename):
global iTunesSD, domains, total_count, KnownEntries, Rules
global iTunesSD_file, domains, total_count, KnownEntries, Rules
# set default properties
props = {
@ -247,7 +214,7 @@ def write_to_db(filename):
'ignore': 0,
'type': 1,
'shuffle': 1,
'reuse': Options['reuse'],
'reuse': not args.force,
'bookmark': 0
}
@ -266,7 +233,7 @@ def write_to_db(filename):
"\0" * (525 - 2 * len(filename))).encode()
# write entry, modifying shuffleflag and bookmarkflag at least
iTunesSD.write(entry[:555] + bytes([props['shuffle']]) + bytes([props['bookmark']]) + bytes([entry[557]]))
iTunesSD_file.write(entry[:555] + bytes([props['shuffle']]) + bytes([props['bookmark']]) + bytes([entry[557]]))
if props['shuffle']: domains[-1].append(total_count)
total_count += 1
return 1
@ -281,21 +248,18 @@ def make_key(s):
for j in range(i, len(s)):
if not s[j].isdigit(): break
if s[j].isdigit(): j += 1
return (s[:i], int(s[i:j]), make_key(s[j:]))
return s[:i], int(s[i:j]), make_key(s[j:])
def key_repr(x):
if type(x) == tuple:
return b"%s%d%s" % (x[0], x[1], key_repr(x[2]))
else:
return x
if type(x) == tuple: return b"%s%d%s" % (x[0], x[1], key_repr(x[2]))
else: return x
def cmp_key(a, b):
if type(a) == tuple and type(b) == tuple:
return cmp(a[0], b[0]) or cmp(a[1], b[1]) or cmp_key(a[2], b[2])
else:
return cmp(key_repr(a), key_repr(b))
else: return cmp(key_repr(a), key_repr(b))
def cmp(a, b):
if a < b: return -1
@ -306,7 +270,7 @@ def cmp(a, b):
def file_entry(path, name, prefix=""):
if not name or name[0] == ".": return None
fullname = "%s/%s" % (path, name)
may_rename = not (fullname.startswith("./iPod_Control")) and Options['rename']
may_rename = not (fullname.startswith("./iPod_Control")) and args.rename
try:
if os.path.islink(fullname):
return None
@ -321,32 +285,26 @@ def file_entry(path, name, prefix=""):
return None
def browse(path, interactive):
def browse(path: string, interactive: bool):
global domains
if path[-1] == "/": path = path[:-1]
displaypath = path[1:]
if not displaypath: displaypath = "/"
if interactive:
while 1:
try:
choice = input("include `%s'? [(Y)es, (N)o, (A)ll] " % displaypath)[:1].lower()
except EOFError:
raise KeyboardInterrupt
if not choice: continue
if choice in "at": # all/alle/tous/<dontknow>
interactive = 0
break
if choice in "yjos": # yes/ja/oui/si
break
if choice in "n": # no/nein/non/non?
return 0
while interactive:
choice = input("include `%s'? [(Y)es, (N)o, (A)ll] " % displaypath)[:1].lower()
if not choice: continue
try:
files = [_f for _f in [file_entry(path, name) for name in os.listdir(path)] if _f]
except OSError:
return
# all/alle/tous/<dontknow>
if choice in "at": interactive = 0
# yes/ja/oui/si
if choice in "yjos": break
# no/nein/non/non?
if choice in "n": return
try: files = [_f for _f in [file_entry(path, name) for name in os.listdir(path)] if _f]
except OSError: return
if path == "./iPod_Control/Music":
subdirs = [x[2] for x in files if not x[0]]
@ -354,10 +312,10 @@ def browse(path, interactive):
for dir in subdirs:
subpath = "%s/%s" % (path, dir)
try:
files.extend(
[x for x in [file_entry(subpath, name, dir + "/") for name in os.listdir(subpath)] if x and x[0]])
except OSError:
pass
files.extend([x for x in [
file_entry(subpath, name, dir + "/") for name in os.listdir(subpath)
] if x and x[0]])
except OSError: pass
files.sort(key = functools.cmp_to_key(cmp_key))
count = len([None for x in files if x[0]])
@ -371,47 +329,43 @@ def browse(path, interactive):
else:
browse(fullname, interactive)
if real_count == count:
log("%s: %d files" % (displaypath, count))
else:
log("%s: %d files (out of %d)" % (displaypath, real_count, count))
log("%s: %d files (out of %d)" % (displaypath, real_count, count))
################################################################################
def stringval(i):
def stringval(i: int) -> bytes:
if i < 0: i += 0x1000000
return b"%c%c%c" % (i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF)
def listval(i):
def listval(i: int) -> list[int]:
if i < 0: i += 0x1000000
return [i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF]
def make_playback_state(volume=None):
def write_playback_state():
# I'm not at all proud of this function. Why can't stupid Python make strings
# mutable?!
log("Setting playback state ...", False)
PState = []
p_state = []
try:
f = open("iPod_Control/iTunes/iTunesPState", "rb")
a = array.array('B')
a.frombytes(f.read())
PState = a.tolist()
p_state = a.tolist()
f.close()
except IOError:
del PState[:]
del p_state[:]
#if len(PState) != 21:
# print("catstare")
# PState = listval(29) + [0] * 15 + listval(1) # volume 29, FW ver 1.0
PState[3:15] = [0] * 6 + [1] + [0] * 5 # track 0, shuffle mode, start of track
if volume is not None:
PState[:3] = listval(volume)
p_state[3:15] = [0] * 6 + [1] + [0] * 5 # track 0, shuffle mode, start of track
if args.volume is not None:
p_state[:3] = listval(args.volume)
try:
f = open("iPod_Control/iTunes/iTunesPState", "wb")
array.array('B', PState).tofile(f)
array.array('B', p_state).tofile(f)
f.close()
except IOError:
log("FAILED.")
@ -420,7 +374,7 @@ def make_playback_state(volume=None):
return 1
def make_stats(count):
def write_stats(count):
log("Creating statistics file ...", False)
try:
file = open("iPod_Control/iTunes/iTunesStats", "wb")
@ -437,10 +391,9 @@ def make_stats(count):
def smart_shuffle():
try:
slice_count = max(list(map(len, domains)))
except ValueError:
return []
try: slice_count = max(list(map(len, domains)))
except ValueError: return []
slices = [[] for x in range(slice_count)]
slice_fill = [0] * slice_count
@ -451,7 +404,8 @@ def smart_shuffle():
# find slices where the nearest track of the same domain is far away
metric = [
min([slice_count] + [min(abs(s - u), abs(s - u + slice_count), abs(s - u - slice_count)) for u in used])
for s in range(slice_count)]
for s in range(slice_count)
]
thresh = (max(metric) + 1) // 2
farthest = [s for s in range(slice_count) if metric[s] >= thresh]
@ -461,7 +415,7 @@ def smart_shuffle():
# choose one of the remaining candidates and add the track to the chosen slice
s = random.choice(emptiest or farthest)
slices[s].append((n, d))
slices[s].append([n, d])
slice_fill[s] += 1
used.append(s)
@ -477,15 +431,15 @@ def smart_shuffle():
return seq
def make_shuffle(count):
def write_shuffle(count):
random.seed()
if Options['smart']:
log("Generating smart shuffle sequence ...", False)
seq = smart_shuffle()
else:
if args.nosmart:
log("Generating shuffle sequence ...", False)
seq = list(range(count))
random.shuffle(seq)
else:
log("Generating smart shuffle sequence ...", False)
seq = smart_shuffle()
try:
with open("iPod_Control/iTunes/iTunesShuffle", "wb") as file:
file.write(b"".join(map(stringval, seq)))
@ -500,7 +454,7 @@ def make_shuffle(count):
def main(dirs):
global header, iTunesSD, total_count, KnownEntries, Rules
global header, iTunesSD_file, total_count, KnownEntries, Rules
log("Welcome to %s, version %s" % (__title__, __version__))
log()
@ -519,29 +473,26 @@ Please make sure that:
sys.exit(1)
header = array.array('B')
iTunesSD = None
try:
iTunesSD = open("iPod_Control/iTunes/iTunesSD", "rb")
header.fromfile(iTunesSD, 51)
if Options['reuse']:
iTunesSD.seek(18)
entry = iTunesSD.read(558)
iTunesSD_file = open("iPod_Control/iTunes/iTunesSD", "rb")
header.fromfile(iTunesSD_file, 51)
if not args.force:
iTunesSD_file.seek(18)
entry = iTunesSD_file.read(558)
while len(entry) == 558:
filename = entry[33::2].split(b"\0", 1)[0]
KnownEntries[filename] = entry
entry = iTunesSD.read(558)
entry = iTunesSD_file.read(558)
except (IOError, EOFError):
pass
if iTunesSD: iTunesSD.close()
if iTunesSD_file: iTunesSD_file.close()
if len(header) == 51:
log("Using iTunesSD headers from existing database.")
if KnownEntries:
log("Collected %d entries from existing database." % len(KnownEntries))
if KnownEntries: log("Collected %d entries from existing database." % len(KnownEntries))
else:
del header[18:]
if len(header) == 18:
log("Using iTunesSD main header from existing database.")
if len(header) == 18: log("Using iTunesSD main header from existing database.")
else:
del header[:]
log("Rebuilding iTunesSD main header from scratch.")
@ -551,8 +502,8 @@ Please make sure that:
log()
try:
iTunesSD = open("iPod_Control/iTunes/iTunesSD", "wb")
header[:18].tofile(iTunesSD)
iTunesSD_file = open("iPod_Control/iTunes/iTunesSD", "wb")
header[:18].tofile(iTunesSD_file)
except IOError:
log("""ERROR: Cannot write to the iPod database file (iTunesSD)!
Please make sure that:
@ -563,25 +514,22 @@ Please make sure that:
log("Searching for files on your iPod.")
try:
if dirs:
for dir in dirs:
browse("./" + dir, Options['interactive'])
else:
browse(".", Options['interactive'])
for dir in dirs:
browse("./" + dir, args.interactive)
log("%d playable files were found on your iPod." % total_count)
log()
log("Fixing iTunesSD header.")
iTunesSD.seek(0)
iTunesSD.write(b"\0%c%c" % (total_count >> 8, total_count & 0xFF))
iTunesSD.close()
iTunesSD_file.seek(0)
iTunesSD_file.write(b"\0%c%c" % (total_count >> 8, total_count & 0xFF))
iTunesSD_file.close()
except IOError:
log("ERROR: Some strange errors occured while writing iTunesSD.")
log(" You may have to re-initialize the iPod using iTunes.")
sys.exit(1)
if make_playback_state(Options['volume']) * \
make_stats(total_count) * \
make_shuffle(total_count):
if write_playback_state() * \
write_stats(total_count) * \
write_shuffle(total_count):
log()
log("The iPod shuffle database was rebuilt successfully.")
log("Have fun listening to your music!")
@ -593,75 +541,29 @@ Please make sure that:
################################################################################
def help():
print("Usage: %s [OPTION]... [DIRECTORY]..." % sys.argv[0])
print("""Rebuild iPod shuffle database.
Mandatory arguments to long options are mandatory for short options too.
-h, --help display this help text
-i, --interactive prompt before browsing each directory
-v, --volume=VOL set playback volume to a value between 0 and 38
-s, --nosmart do not use smart shuffle
-n, --nochdir do not change directory to this scripts directory first
-l, --nolog do not create a log file
-f, --force always rebuild database entries, do not re-use old ones
-L, --logfile set log file name
Must be called from the iPod's root directory. By default, the whole iPod is
searched for playable files, unless at least one DIRECTORY is specified.""")
def opterr(msg):
print("parse error:", msg)
print("use `%s -h' to get help" % sys.argv[0])
sys.exit(1)
def parse_options():
try:
opts, args = getopt.getopt(sys.argv[1:], "hiv:snlfL:r", \
["help", "interactive", "volume=", "nosmart", "nochdir", "nolog", "force",
"logfile=", "rename"])
except getopt.GetoptError as message:
opterr(message)
sys.exit(1)
for opt, arg in opts:
if opt in ("-h", "--help"):
help()
sys.exit(0)
elif opt in ("-i", "--interactive"):
Options['interactive'] = True
elif opt in ("-v", "--volume"):
try:
Options['volume'] = int(arg)
except ValueError:
opterr("invalid volume")
elif opt in ("-s", "--nosmart"):
Options['smart'] = False
elif opt in ("-n", "--nochdir"):
Options['home'] = False
elif opt in ("-l", "--nolog"):
Options['logging'] = False
elif opt in ("-f", "--force"):
Options['reuse'] = 0
elif opt in ("-L", "--logfile"):
Options['logfile'] = arg
elif opt in ("-r", "--rename"):
Options['rename'] = True
return args
################################################################################
if __name__ == "__main__":
args = parse_options()
go_home()
import argparse
parser = argparse.ArgumentParser(description="Rebuild iPod shuffle database.")
parser.add_argument('directories', nargs='*', default=["."])
parser.add_argument("-i", "--interactive", action="store_true", help = "prompt before browsing each directory")
parser.add_argument("-s", "--nosmart", action="store_true", help = "do not use smart shuffle")
parser.add_argument("-n", "--nochdir", action="store_true", help = "do not change directory to this scripts directory first")
parser.add_argument("-l", "--nolog", action="store_true", help = "do not create a log file")
parser.add_argument("-f", "--force", action="store_true", help = "always rebuild database entries, do not re-use old ones")
parser.add_argument("-L", "--logfile", action="store", default="rebuild_db.log.txt", help = "set log file name")
parser.add_argument("-r", "--rename", action="store_true", help = "automatically rename files to safe names")
parser.add_argument("-v", "--volume", type=int, action="store", help = "set playback volume to a value between 0 and 38")
args = parser.parse_args()
if not args.nochdir:
try: os.chdir(os.path.abspath(__file__))
except OSError: pass
open_log()
try:
main(args)
main(args.directories)
except KeyboardInterrupt:
log()
log("You decided to cancel processing. This is OK, but please note that")