ShuffleDB/rebuild_db.py

563 lines
19 KiB
Python
Executable file

#!/usr/bin/env python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
__version__ = "2.0-pre1"
""" VERSION HISTORY
2.0-pre1 (2025-06-20)
* updated to python 3
* removed --nochdir flag
1.0-rc1 (2006-04-26)
* finally(!) added the often-requested auto-rename feature (-r option)
0.7-pre1 (2005-06-09)
* 0.6-final was skipped because of the huge load of new (experimental)
features in this version :)
* rule files allow for nice and flexible fine tuning
* fixed --nochdir and --nosmart bugs
* numerical sorting (i.e. "track2.mp3" < "track10.mp3")
* iTunesSD entries are now copied over from the old file if possible; this
should preserve the keys for .aa files
0.6-pre2 (2005-05-02)
* fixed file type bug (thanks to Nowhereman)
* improved audio book support (thanks to Nowhereman)
* files called $foo.book.$type (like example.book.mp3) are stored as
audio books now
* the subdirectories of /iPod_Control/Music are merged as if they were a
single directory
0.6-pre1 (2005-04-23)
* always starts from the directory of the executable now
* output logging
* generating iTunesPState and iTunesStats so that the iPod won't overwrite
iTunesShuffle anymore
* -> return of smart shuffle ;)
* directory display order is identical to playback ordernow
* command line options and help
* interactive mode, configurable playback volume, directory limitation
0.5 (2005-04-15)
* major code refactoring (thanks to Andre Kloss)
* removed "smart shuffle" again -- the iPod deleted the file anyway :(
* common errors are now reported more concisely
* dot files (e.g. ".hidden_file") are now ignored while browsing the iPod
0.4 (2005-03-20)
* fixed iPod crashes after playing the shuffle playlist to the end
* fixed incorrect databse entries for non-MP3 files
0.3 (2005-03-18)
* Python version now includes a "smart shuffle" feature
0.2 (2005-03-15)
* added Python version
0.1 (2005-03-13)
* initial public release, Win32 only
"""
import functools, sys, os, os.path, array, random, fnmatch, operator, string
from builtins import map, input, range
from io import BytesIO
from typing import TextIO
# @formatter:off
KnownProps = ('filename', 'size', 'ignore', 'type', 'shuffle', 'reuse', 'bookmark')
Rules = [
([('filename', '~', '*.mp3')], {'type': 1, 'shuffle': 1, 'bookmark': 0}),
([('filename', '~', '*.m4?')], {'type': 2, 'shuffle': 1, 'bookmark': 0}),
([('filename', '~', '*.m4b')], { 'shuffle': 0, 'bookmark': 1}),
([('filename', '~', '*.aa')], {'type': 1, 'shuffle': 0, 'bookmark': 1, 'reuse': 1}),
([('filename', '~', '*.wav')], {'type': 4, 'shuffle': 0, 'bookmark': 0}),
([('filename', '~', '*.book.???')], { 'shuffle': 0, 'bookmark': 1}),
([('filename', '~', '*.announce.???')], { 'shuffle': 0, 'bookmark': 0}),
([('filename', '~', '/recycled/*')], {'ignore': 1}),
]
# @formatter:on
domains = []
total_count = 0
KnownEntries = {}
logfile: TextIO | None = None
args = None
iTunesSD_file: BytesIO | None = None
################################################################################
def open_log():
global logfile
if not args.nolog:
try: logfile = open(args.logfile, "w")
except IOError:logfile = None
else: logfile = None
def log(line="", newline=True):
global logfile
if newline: line += "\n"
else: line += " "
print(line, end="")
if logfile: logfile.write(line)
def close_log():
global logfile
if logfile: logfile.close()
def filesize(filename):
try: return os.stat(filename)[6]
except OSError: return None
################################################################################
def MatchRule(props, rule):
try:
prop, op, ref = props[rule[0]], rule[1], rule[2]
except KeyError:
return False
if rule[1] == '~': return fnmatch.fnmatchcase(prop.lower(), ref.lower())
elif rule[1] == '=': return prop == ref
elif rule[1] == '>': return prop > ref
elif rule[1] == '<': return prop < ref
else: return False
def ParseValue(val):
if len(val) >= 2 and ((val[0] == "'" and val[-1] == "'") or (val[0] == '"' and val[-1] == '"')):
return val[1:-1]
try: return int(val)
except ValueError: return val
def ParseRule(rule):
sep_pos = min([rule.find(sep) for sep in "~=<>" if rule.find(sep) > 0])
prop = rule[:sep_pos].strip()
if not prop in KnownProps:
log(f"WARNING: unknown property '{prop}'")
return prop, rule[sep_pos], ParseValue(rule[sep_pos + 1:].strip())
def ParseAction(action):
prop, value = list(map(str.strip, action.split('=', 1)))
if not prop in KnownProps:
log(f"WARNING: unknown property '{prop}'")
return prop, ParseValue(value)
def ParseRuleLine(line):
line = line.strip()
if not line or line[0] == "#":
return None
try:
# split line into "ruleset: action"
tmp = line.split(":")
ruleset = list(map(str.strip, ":".join(tmp[:-1]).split(",")))
actions = dict(list(map(ParseAction, tmp[-1].split(","))))
if len(ruleset) == 1 and not (ruleset[0]):
return [], actions
else:
return list(map(ParseRule, ruleset)), actions
except OSError: # (ValueError,IndexError,KeyError):
log(f"WARNING: rule '{line}' is malformed, ignoring")
return None
################################################################################
def safe_char(c):
if c in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_": return c
else: return "_"
def rename_safely(path, name):
base, ext = os.path.splitext(name)
newname = ''.join(map(safe_char, base))
if name == newname + ext:
return name
if os.path.exists(f"{path}/{newname}{ext}"):
i = 0
while os.path.exists(f"{path}/{newname}_{i}{ext}"):
i += 1
newname += f"_{i}"
newname += ext
try:
os.rename(f"{path}/{name}", f"{path}/{newname}")
except OSError:
pass # don't fail if the rename didn't work
return newname
def write_to_db(filename):
global iTunesSD_file, domains, total_count, KnownEntries, Rules
# set default properties
props = {
'filename': filename,
'size': filesize(filename[1:]),
'ignore': 0,
'type': 1,
'shuffle': 1,
'reuse': not args.force,
'bookmark': 0
}
# check and apply rules
for ruleset, action in Rules:
if functools.reduce(operator.__and__, [MatchRule(props, rule) for rule in ruleset], True):
props.update(action)
if props['ignore']: return 0
# retrieve entry from known entries or rebuild it
entry = props['reuse'] and (filename in KnownEntries) and KnownEntries[filename]
if not entry:
header[29] = props['type']
entry = header.tobytes() + \
("".join([c + "\0" for c in filename[:261]]) + \
"\0" * (525 - 2 * len(filename))).encode()
# write entry, modifying shuffleflag and bookmarkflag at least
iTunesSD_file.write(entry[:555] + bytes([props['shuffle']]) + bytes([props['bookmark']]) + bytes([entry[557]]))
if props['shuffle']: domains[-1].append(total_count)
total_count += 1
return 1
def make_key(s):
if not s: return s
s = s.lower()
for i in range(len(s)):
if s[i].isdigit(): break
if not s[i].isdigit(): return s
for j in range(i, len(s)):
if not s[j].isdigit(): break
if s[j].isdigit(): j += 1
return s[:i], int(s[i:j]), make_key(s[j:])
def key_repr(x):
if type(x) == tuple: return b"%s%d%s" % (x[0], x[1], key_repr(x[2]))
else: return x
def cmp_key(a, b):
if type(a) == tuple and type(b) == tuple:
return cmp(a[0], b[0]) or cmp(a[1], b[1]) or cmp_key(a[2], b[2])
else: return cmp(key_repr(a), key_repr(b))
def cmp(a, b):
if a < b: return -1
elif a > b: return 1
else: return 0
def file_entry(path, name, prefix=""):
if not name or name[0] == ".": return None
fullname = f"{path}/{name}"
may_rename = not (fullname.startswith("./iPod_Control")) and args.rename
try:
if os.path.islink(fullname):
return None
if os.path.isdir(fullname):
if may_rename: name = rename_safely(path, name)
return 0, make_key(name), prefix + name
if os.path.splitext(name)[1].lower() in (".mp3", ".m4a", ".m4b", ".m4p", ".aa", ".wav"):
if may_rename: name = rename_safely(path, name)
return 1, make_key(name), prefix + name
except OSError: pass
return None
def browse(path: string, interactive: bool):
global domains
if path[-1] == "/": path = path[:-1]
displaypath = path[1:]
if not displaypath: displaypath = ""
while interactive:
choice = input(f"include '{displaypath}'? [(Y)es, (N)o, (A)ll] ")[:1].lower()
if not choice: continue
# all/alle/tous/<dontknow>
if choice in "at": interactive = 0
# yes/ja/oui/si
if choice in "yjos": break
# no/nein/non/non?
if choice in "n": return
try: files = [_f for _f in [file_entry(path, name) for name in os.listdir(path)] if _f]
except OSError: return
if path == "./iPod_Control/Music":
subdirs = [x[2] for x in files if not x[0]]
files = [x for x in files if x[0]]
for dir in subdirs:
subpath = f"{path}/{dir}"
try:
files.extend([x for x in [
file_entry(subpath, name, dir + "/") for name in os.listdir(subpath)
] if x and x[0]])
except OSError: pass
files.sort(key = functools.cmp_to_key(cmp_key))
count = len([None for x in files if x[0]])
if count: domains.append([])
real_count = 0
for item in files:
fullname = f"{path}/{item[2]}"
if item[0]:
real_count += write_to_db(fullname[1:])
else:
browse(fullname, interactive)
log(f"{displaypath}: {real_count} files (out of {count})")
################################################################################
def stringval(i: int) -> bytes:
if i < 0: i += 0x1000000
return b"%c%c%c" % (i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF)
def listval(i: int) -> list[int]:
if i < 0: i += 0x1000000
return [i & 0xFF, (i >> 8) & 0xFF, (i >> 16) & 0xFF]
def write_playback_state():
# I'm not at all proud of this function. Why can't stupid Python make strings
# mutable?!
log("Setting playback state ...", False)
p_state = []
try:
f = open("iPod_Control/iTunes/iTunesPState", "rb")
a = array.array('B')
a.frombytes(f.read())
p_state = a.tolist()
f.close()
except IOError:
del p_state[:]
#if len(PState) != 21:
# PState = listval(29) + [0] * 15 + listval(1) # volume 29, FW ver 1.0
p_state[3:15] = [0] * 6 + [1] + [0] * 5 # track 0, shuffle mode, start of track
if args.volume is not None:
p_state[:3] = listval(args.volume)
try:
f = open("iPod_Control/iTunes/iTunesPState", "wb")
array.array('B', p_state).tofile(f)
f.close()
except IOError:
log("FAILED.")
return 0
log("OK.")
return 1
def write_stats(count):
log("Creating statistics file ...", False)
try:
file = open("iPod_Control/iTunes/iTunesStats", "wb")
file.write(stringval(count) + b"\0" * 3 + (stringval(18) + b"\xff" * 3 + b"\0" * 12) * count)
file.close()
except IOError:
log("FAILED.")
return 0
log("OK.")
return 1
################################################################################
def smart_shuffle():
try: slice_count = max(list(map(len, domains)))
except ValueError: return []
slices = [[] for x in range(slice_count)]
slice_fill = [0] * slice_count
for d in range(len(domains)):
used = []
if not domains[d]: continue
for n in domains[d]:
# find slices where the nearest track of the same domain is far away
metric = [
min([slice_count] + [min(abs(s - u), abs(s - u + slice_count), abs(s - u - slice_count)) for u in used])
for s in range(slice_count)
]
thresh = (max(metric) + 1) // 2
farthest = [s for s in range(slice_count) if metric[s] >= thresh]
# find emptiest slices
thresh = (min(slice_fill) + max(slice_fill) + 1) // 2
emptiest = [s for s in range(slice_count) if slice_fill[s] <= thresh if (s in farthest)]
# choose one of the remaining candidates and add the track to the chosen slice
s = random.choice(emptiest or farthest)
slices[s].append([n, d])
slice_fill[s] += 1
used.append(s)
# shuffle slices and avoid adjacent tracks of the same domain at slice boundaries
seq = []
last_domain = -1
for slice in slices:
random.shuffle(slice)
if len(slice) > 2 and slice[0][1] == last_domain:
slice.append(slice.pop(0))
seq += [x[0] for x in slice]
last_domain = slice[-1][1]
return seq
def write_shuffle(count):
random.seed()
if args.nosmart:
log("Generating shuffle sequence ...", False)
seq = list(range(count))
random.shuffle(seq)
else:
log("Generating smart shuffle sequence ...", False)
seq = smart_shuffle()
try:
with open("iPod_Control/iTunes/iTunesShuffle", "wb") as file:
file.write(b"".join(map(stringval, seq)))
except IOError:
log("FAILED.")
return 0
log("OK.")
return 1
################################################################################
def main(dirs):
global header, iTunesSD_file, total_count, KnownEntries, Rules
log(f"Welcome to ShuffleDB, version {__version__}")
log()
try:
f = open("rebuild_db.rules", "r")
Rules += [_f for _f in map(ParseRuleLine, f.read().split("\n")) if _f]
f.close()
except IOError:
pass
if not os.path.isdir("iPod_Control/iTunes"):
log("""ERROR: No iPod control directory found!
Please make sure that:
(*) this program's working directory is the iPod's root directory
(*) the iPod was correctly initialized with iTunes""")
sys.exit(1)
header = array.array('B')
try:
iTunesSD_file = open("iPod_Control/iTunes/iTunesSD", "rb")
header.fromfile(iTunesSD_file, 51)
if not args.force:
iTunesSD_file.seek(18)
entry = iTunesSD_file.read(558)
while len(entry) == 558:
filename = entry[33::2].split(b"\0", 1)[0]
KnownEntries[filename] = entry
entry = iTunesSD_file.read(558)
except (IOError, EOFError):
pass
if iTunesSD_file: iTunesSD_file.close()
if len(header) == 51:
log("Using iTunesSD headers from existing database.")
if KnownEntries: log(f"Collected {len(KnownEntries)} entries from existing database.")
else:
del header[18:]
if len(header) == 18: log("Using iTunesSD main header from existing database.")
else:
del header[:]
log("Rebuilding iTunesSD main header from scratch.")
header.fromlist([0, 0, 0, 1, 6, 0, 0, 0, 18] + [0] * 9)
log("Rebuilding iTunesSD entry header from scratch.")
header.fromlist([0, 2, 46, 90, 165, 1] + [0] * 20 + [100, 0, 0, 1, 0, 2, 0])
log()
try:
iTunesSD_file = open("iPod_Control/iTunes/iTunesSD", "wb")
header[:18].tofile(iTunesSD_file)
except IOError:
log("""ERROR: Cannot write to the iPod database file (iTunesSD)!
Please make sure that:
(*) you have sufficient permissions to write to the iPod volume
(*) you are actually using an iPod shuffle, and not some other iPod model :)""")
sys.exit(1)
del header[:18]
log("Searching for files on your iPod.")
try:
for dir in dirs:
browse(dir, args.interactive)
log(f"{total_count} playable files were found on your iPod.")
log()
log("Fixing iTunesSD header.")
iTunesSD_file.seek(0)
iTunesSD_file.write(b"\0%c%c" % (total_count >> 8, total_count & 0xFF))
iTunesSD_file.close()
except IOError:
log("ERROR: Some strange errors occured while writing iTunesSD.")
log(" You may have to re-initialize the iPod using iTunes.")
sys.exit(1)
if write_playback_state() * \
write_stats(total_count) * \
write_shuffle(total_count):
log()
log("The iPod shuffle database was rebuilt successfully.")
log("Have fun listening to your music!")
else:
log()
log("WARNING: The main database file was rebuilt successfully, but there were errors")
log(" while resetting the other files. However, playback MAY work correctly.")
################################################################################
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Rebuild iPod shuffle database. Updated fork of Martin Fiedler's rebuild_db (https://shuffle-db.sourceforge.net/)",
)
parser.add_argument('directories', nargs='*', default=["."])
parser.add_argument("-i", "--interactive", action="store_true", help = "prompt before browsing each directory")
parser.add_argument("-s", "--nosmart", action="store_true", help = "do not use smart shuffle")
parser.add_argument("-l", "--nolog", action="store_true", help = "do not create a log file")
parser.add_argument("-f", "--force", action="store_true", help = "always rebuild database entries, do not re-use old ones")
parser.add_argument("-L", "--logfile", action="store", default="rebuild_db.log.txt", help = "set log file name")
parser.add_argument("-r", "--rename", action="store_true", help = "automatically rename files to safe names")
parser.add_argument("-v", "--volume", type=int, action="store", help = "set playback volume to a value between 0 and 38")
args = parser.parse_args()
open_log()
try:
main(args.directories)
except KeyboardInterrupt:
log()
log("You decided to cancel processing. This is OK, but please note that")
log("the iPod database is now corrupt and the iPod won't play!")
close_log()