# MousikóFídi
# Copyright (C) 2019 Hristos N. Triantafillou
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see
The directory '{}' could not be read due to a permissions error!
""".format( path ) ) dir_list = [] item_list = [] file_list = [] _audio_list = [] _video_list = [] for i in dir_items: item_path = os.path.join(path, i) metadata = get_metadata_dict(item_path) if is_audio_file(item_path): dbg("Audio found: " + i) file_list.append(item_path) _audio_list.append(file_dict(item_path, metadata, "audio")) item_list.append(file_dict(item_path, metadata, "audio")) context["add_all_button"] = True context["audio_player"] = True elif is_video_file(item_path): dbg("Video found: " + i) file_list.append(item_path) _video_list.append(file_dict(item_path, metadata, "video")) item_list.append(file_dict(item_path, metadata, "video")) context["add_all_button"] = True context["video_player"] = True elif os.path.isdir(item_path): dbg("Dir found: " + i) dir_list.append(dir_dict(item_path)) audio_list = make_unique_slugs(_audio_list) video_list = make_unique_slugs(_video_list) music_dirs = [] for d in context["music_dirs"]: music_dirs.append(d["full_path"]) context["cover_art"] = select_cover_art(path) context["file_list"] = file_list context["item_list"] = item_list context["audio_list"] = audio_list context["video_list"] = video_list context["page_name"] = path context["page_path"] = breadcrumb_links_from_path(path, music_dirs) context["playlist_add"] = True context["playlist_rm"] = False context["item_type"] = "dir" context["dir_list"] = dir_list return context def browse_file(context: dict, path: str) -> dict: dbg("Reading File: " + path) file_name = path.split("/")[-1] metadata = get_metadata_dict(path) if is_audio_file(path): context["item_type"] = "audio" elif is_video_file(path): context["item_type"] = "video" music_dirs = [] for d in context["music_dirs"]: music_dirs.append(d["full_path"]) for tag in ( "album", "artist", "comment", "date", "encoded_by", "genre", "length", "lyrics", "title", "track", "tracktotal", ): try: context[tag] = metadata[tag] except KeyError: # A given track may have any or none of the above tags. pass context["cover_art"] = select_cover_art(path) context["page_name"] = metadata["title"] or file_name context["full_path"] = path context["path"] = path.strip("/") context["file_name"] = file_name context["page_path"] = breadcrumb_links_from_path(path, music_dirs) return context def config_to_string(config_file: str) -> str: try: with open(config_file, "r") as f: lines = f.readlines() return "".join(lines) except FileNotFoundError: pass def request_context(config_data: dict) -> dict: favicon = select_logo(config_data, "favicon_path") logo = select_logo(config_data, "logo_path") m = config_data["config"]["music_dirs"] music_dirs = paths_list(m) playlists = list_playlists(config_data["config"]["playlist"]["dir"]) try: icons = session["icons"] except KeyError: icons = config_data["config"]["icons"] try: queue = session["queue"] except KeyError: queue = [] try: username = session["username"] except KeyError: username = None css, theme = select_css() return { "css": css, "debug": debug, "favicon_path": favicon, "icons": icons, "logo_path": logo, "music_dirs": music_dirs, "playlist_dir": config_data["config"]["playlist"]["dir"], "playlist_save": config_data["config"]["playlist"]["save"], "playlists": playlists, "preload_audio": config_data["config"]["preload_audio"], "preload_video": config_data["config"]["preload_video"], "search": config_data["config"]["search"], "secret_key": config_data["config"]["secret_key"], "site_name": config_data["config"]["site_name"], "queue": queue, "theme": theme, "username": username, } def init( fidi_config=None, use_config=( os.getenv("FIDI_CONFIG_PATH") or os.getenv("FIDI_CONFIG") or os.getenv("FIDI_CFG_PATH") or os.getenv("FIDI_CFG") or None ), ) -> dict: if use_config: fidi_config = use_config if fidi_config and os.path.isfile(fidi_config): dbg("Reading User-Supplied Config: " + fidi_config) # uwsgi blows up if we don't first test local_config_file here.. elif local_config_file and os.path.isfile(local_config_file): dbg("Reading Module-Local Config: " + local_config_file) fidi_config = local_config_file elif os.path.isfile(user_config_file): dbg("Reading User Config: " + user_config_file) fidi_config = user_config_file if fidi_config: c = yaml.load(config_to_string(fidi_config), Loader=yaml.BaseLoader) else: wrn("No CFG found or given, creating a default for use.") cfg = """config: cover_art: true favicon_path: /fidi.png holidays: true icons: false logo_path: /fidi.png music_dirs: - {home}/music - {home}/video playlist: dir: {home}/music/playlists save: true preload_audio: false preload_video: false secret_key: {secret} site_name: MousikóFídi - Your Music Cloud theme: light""".format( home=os.getenv("HOME"), secret=os.urandom(24) ) with open(user_config_file, "w") as f: for line in cfg.split("\n"): f.write(line + "\n") c = yaml.load(config_to_string(user_config_file), Loader=yaml.BaseLoader) # TODO: A more DRY way to handle checking for configs try: if c["config"]["cover_art"].lower() == "true": c["config"]["cover_art"] = True else: c["config"]["cover_art"] = False except KeyError: wrn( "No 'cover_art' value was found in the configuration file! Defaulting to on..." ) c["config"]["cover_art"] = True try: if c["config"]["holidays"].lower() == "true": c["config"]["holidays"] = True else: c["config"]["holidays"] = False except KeyError: wrn( "No 'holidays' value was found in the configuration file! Defaulting to on..." ) c["config"]["holidays"] = True try: if c["config"]["icons"].lower() == "true": c["config"]["icons"] = True else: c["config"]["icons"] = False except KeyError: wrn( "No 'icons' value was found in the configuration file! Defaulting to off..." ) c["config"]["icons"] = False # TODO: Re-enable this when db support is added # try: # if c["config"]["search"].lower() == "true": # c["config"]["search"] = True # else: # c["config"]["search"] = False # except KeyError: # wrn( # "No 'search' value was found in the configuration file! Defaulting to off..." # ) # c["config"]["search"] = False c["config"]["search"] = False try: app.secret_key = c["config"]["secret_key"] # Sort of obscure the key c["config"]["secret_key"] = True except KeyError: err( "No 'secret_key' was found in the configuration file! Related functionality will be disabled..." ) c["config"]["secret_key"] = None try: if c["config"]["playlist"]["save"].lower() == "true": c["config"]["playlist"]["save"] = True else: c["config"]["playlist"]["save"] = False except KeyError: wrn( "No 'playlist.save' value was found in the configuration file! Defaulting to off..." ) c["config"]["playlist"]["save"] = False try: if c["config"]["preload_audio"].lower() == "true": c["config"]["preload_audio"] = True else: c["config"]["preload_audio"] = False except KeyError: wrn( "No 'preload_audio' value was found in the configuration file! Defaulting to off..." ) c["config"]["preload_audio"] = False try: if c["config"]["preload_video"].lower() == "true": c["config"]["preload_video"] = True else: c["config"]["preload_video"] = False except KeyError: wrn( "No 'preload_video' value was found in the configuration file! Defaulting to off..." ) c["config"]["preload_video"] = False try: theme = c["config"]["theme"].lower() if theme in THEMES.keys(): dbg("Using the configured theme: " + theme) else: wrn("Unrecognized theme: " + theme) wrn("Using the default theme: light") c["config"]["theme"] = "light" except KeyError: wrn( "No 'theme' value was found in the configuration file! Defaulting to light..." ) c["config"]["theme"] = "light" return c def get_metadata_dict(file_path: str) -> dict: d = { "artist": None, "album": None, "date": None, "genre": None, "length": None, "title": None, "track": None, "tracktotal": None, } metadata = file_metadata(file_path) if is_audio_file(file_path): d.update( { "album": get_metadata_value( [ "ALBUM", "album", # for mp3 "TALB", ], metadata, ) } ) d.update( { "artist": get_metadata_value( [ # TODO: make this ordering configurable "ARTIST", "ARTIST_CREDIT", "ARTISTSORT", "artist", "ALBUMARTIST", "ALBUMARTIST_CREDIT", "ALBUMARTISTSORT", # for mp3 "TPE1", ], metadata, ) } ) d.update( { "date": get_metadata_value( [ "DATE", "ORIGINALDATE", "YEAR", "date", # for mp3 "TDRC", ], metadata, ) } ) d.update( { "genre": get_metadata_value( [ "GENRE", "genre", # for mp3 "TCON", ], metadata, ) } ) d.update( { "title": get_metadata_value( [ "TITLE", "title", # for mp3 "TIT2", ], metadata, ) } ) d.update({"track": get_metadata_value(["TRACK", "TRACKNUMBER"], metadata)}) tracktotal = get_metadata_value( ["TRACKTOTAL", "TRACKC", "TOTALTRACKS", "TRCK"], metadata ) if tracktotal and "/" in tracktotal: # MP3s commonly have the track number and total stored as one value... trackdata = tracktotal.split("/") d.update({"track": trackdata[0].strip()}) d.update({"tracktotal": trackdata[1].strip()}) else: d.update({"tracktotal": tracktotal}) d.update({"comment": get_metadata_value(["COMMENT", "COMM"], metadata)}) d.update({"encoded_by": get_metadata_value(["ENCODED-BY", "TENC"], metadata)}) d.update({"lyrics": get_metadata_value(["LYRICS", "USLT"], metadata)}) elif is_video_file(file_path): if file_path.endswith(".mp4"): for k, v in metadata.items(): if "ART" in k: d.update({"artist": v}) elif "alb" in k: d.update({"album": v}) elif "cmt" in k: d.update({"comment": v}) elif "day" in k: d.update({"date": v}) elif "enc" in k: d.update({"encoded_by": v}) elif "gen" in k: d.update({"genre": v}) elif "lyr" in k: d.update({"lyrics": v}) elif "nam" in k: d.update({"title": v}) elif "trkn" in k: track_list = v.strip("(").strip(")").split(",") if track_list[0] != "0": d.update({"track": track_list[0].strip(" ")}) if track_list[1] != "0": d.update({"tracktotal": track_list[1].strip(" ")}) d.update({"length": get_metadata_value(["length"], metadata)}) return d def get_metadata_value(key_list: list, metadata: dict) -> T[str, None]: for key in key_list: try: return metadata[key].strip() except KeyError: pass return None def dir_dict(path: str) -> dict: return {"name": path.split(os.path.sep)[-1], "path": path.strip("/")} def file_dict(path: str, metadata: dict, ftype: str, title_limit=17) -> dict: file_name = path.split(os.path.sep)[-1] file_name_mobile = path.split(os.path.sep)[-1] title = metadata["title"] title_mobile = title if len(file_name.split()) == 1 and len(file_name) > title_limit: file_name_mobile = file_name[:title_limit] + "…" if title: if len(title.split()) == 1 and len(title) > title_limit: title_mobile = title[:title_limit] + "…" return { "album": metadata["album"], "artist": metadata["artist"], "genre": metadata["genre"], "length": metadata["length"], "title": title, "title_mobile": title_mobile, "track": metadata["track"], "type": ftype, "tracktotal": metadata["tracktotal"], "file_name": file_name, "file_name_mobile": file_name_mobile, "file_path": path.strip("/"), "slug": title_slug(title or file_name), } def breadcrumb_links_from_path(path: str, music_dirs: list) -> str: link_string = "" path_string = "" for d in music_dirs: if path.startswith(d): _path = d.strip("/") link_string += '{path}'.format( url=url_for(".dir_detail", path=_path), path=d ) path_string += d new_path = path.replace(d, "").strip("/") dir_list = new_path.split("/") for dd in dir_list: if dd: path_string = os.path.join(path_string, dd) if os.path.isdir(path_string): link_string += ' / {name}'.format( name=dd, url=url_for(".dir_detail", path=path_string.strip("/")), ) elif os.path.isfile(path_string): link_string += " / {}".format(dd) return link_string def get_playlists(pdir: str) -> list: plist_contents = [] plists = [] if os.path.isdir(pdir): for plist in os.listdir(pdir): ppath = os.path.join(pdir, plist) with open(ppath, "r") as f: plist_contents = f.readlines() plists.append( { "name": plist.split(".m3u")[0], "filename": plist, "count": len(plist_contents), } ) return sorted(plists, key=lambda n: n["name"]) return list() def handle_playlist_cmd(cmd: str, path: str, context: dict) -> dict: # This is only used in one view but was created in order to keep # all of this business out of the main view function. flashme = """The playlist add operation could not be completed because it would cause the playlist to exceed the max cookie size!
See this ticket for more information about how this will be fixed.
""" p_size = len(bytes(str(context["queue"]), "utf8")) dbg("p_size: " + str(p_size)) if cmd == "add": new_size = p_size + len(bytes(str(path), "utf8")) if new_size < MAX_COOKIE_SIZE: dbg("Adding item to playlist: " + path) context["queue"].append(path) else: dbg("Cookie overflow detected, preventing add operation...") context["flashed"] = True flash(flashme) elif cmd == "bulk": bulk = ast.literal_eval(request.form["bulk-list"]) new_size = p_size + len(bytes(str(bulk), "utf8")) if new_size < MAX_COOKIE_SIZE: dbg("Bulk adding items to playlist:") for path in bulk: dbg("Adding: " + path) context["queue"].append(path) flash( """All files in this dir added to your queue!
""" ) else: dbg("Cookie overflow detected, preventing bulk operation...") flash(flashme) elif cmd == "clear": if path == "/all": if len(bytes(str(context["queue"]), "utf8")) < MAX_COOKIE_SIZE: flash( # TODO: Use url_for here """Your queue was cleared!
""".format( quote(str(context["queue"])) ) ) else: flash( """Your queue was cleared but it was too big for an undo!
""" ) context["queue"] = [] elif path == "/undo": dbg("Undoing playlist clear!") context["queue"] = ast.literal_eval( urllib.parse.unquote(request.form["to-load"]) ) flash('The playlist clear was undone!
') elif cmd == "load": playlist_dir = context["playlist_dir"] to_load = request.form["to-load"] new_size = p_size + len(bytes(str(to_load), "utf8")) full_path = os.path.join(playlist_dir, to_load + ".m3u") if "delete" in request.form or "really-delete" in request.form: if "delete" in request.form: if os.path.isfile(full_path): flash( """""".format( to_load ) ) return context elif "really-delete" in request.form: os.remove(full_path) flash( 'The playlist "{}" was deleted!
'.format( to_load ) ) return context if new_size < MAX_COOKIE_SIZE: load_error = False loaded = [] if os.path.isfile(full_path): dbg("Loading playlist: " + full_path) content = [] with open(full_path, "r") as f: content = f.readlines() if content: for line in content: _line = line.rstrip() if os.path.isfile(_line) and is_valid_path(context, _line): dbg("Loading track: " + _line) context["queue"].append(_line) loaded.append(_line) else: load_error = 'The playlist "{}" was loaded with errors!
' err("Could not load track path: " + _line) else: load_error = 'The playlist "{}" is empty and cannot be loaded!
'.format( to_load ) if not loaded: load_error = 'The playlist "{}" was unable to load due to errors!
' if load_error: flash(load_error.format(to_load)) else: flash( 'The playlist "{}" was loaded!
'.format( to_load ) ) else: dbg("Cookie overflow detected, preventing load operation...") flash(flashme) elif cmd == "rm": dbg("Removing item from playlist: " + path) flash( """The track '{}' was removed from the playlist!
""".format( get_metadata_dict(path)["title"] ) ) context["queue"].remove(path) elif cmd == "save": allowed_chars = (" ", "_", "+") bulk_list = request.form.get("bulk-list") if bulk_list: bulk_paths = ast.literal_eval(bulk_list) else: bulk_paths = ast.literal_eval( urllib.parse.unquote(request.form.get("to-save")) ) playlist_content = [] playlist_dir = context["playlist_dir"] raw_file_name = request.form["file-name"] cleaned_file_name = "".join( s for s in raw_file_name if s.isalnum() or s in allowed_chars ) full_path_file_name = os.path.join(playlist_dir, cleaned_file_name) + ".m3u" if not cleaned_file_name: flash( 'Playlist names may only contain alphanumeric characters, spaces, underscores, or plus signs!
' ) return context if os.path.isfile(full_path_file_name): if "save" in request.form: flash( """""".format( cleaned_file_name, quote(str(context["queue"])) ) ) return context elif "really-save" in request.form: pass for p in bulk_paths: playlist_content.append(p) playlist_content.append("\n") with open(full_path_file_name, "w") as f: for line in playlist_content: f.write(line) dbg("Playlist written: " + full_path_file_name) flash( 'The playlist "{}" was saved.
'.format( cleaned_file_name ) ) return context def is_audio_file(file_path: str) -> bool: return os.path.isfile(file_path) and ( file_path.endswith(".flac") or file_path.endswith(".mp3") or file_path.endswith(".ogg") ) def is_video_file(file_path: str) -> bool: return os.path.isfile(file_path) and ( file_path.endswith(".mp4") or file_path.endswith(".webm") ) def is_valid_path(request_context: dict, path: str) -> bool: _path = path.strip("/").rstrip("/") abs_path = os.path.abspath(os.path.sep + _path) for d in request_context["music_dirs"]: full_path = d["full_path"] if abs_path.startswith(full_path): return True return False def list_playlists(playlist_dir: str) -> list: playlists = [] dbg("Checking playlist dir: " + playlist_dir) if os.path.isdir(playlist_dir): contents = os.listdir(playlist_dir) if contents: for i in contents: if i.endswith(".m3u"): p = os.path.join(playlist_dir, i) playlists.append(p) return sorted(playlists) def make_unique_slugs(item_list: list) -> list: used_slugs = [] count = 0 slug_extra = 0 for i in item_list: if "slug" in i.keys(): if i["slug"] in used_slugs: newslug = i["slug"] + str(slug_extra) while newslug in used_slugs: slug_extra += 1 newslug = i["slug"] + str(slug_extra) item_list[count]["slug"] = newslug used_slugs.append(newslug) else: used_slugs.append(i["slug"]) count += 1 return item_list def paths_list(music_dirs: list) -> list: dl = [] for md in music_dirs: path = md.strip("/") dl.append({"full_path": os.path.join(os.path.sep, path), "path": path}) return dl def search_files(q: str, only_audio: bool, only_video: bool) -> list: hits = [] mdirs = app.fidiConfig["config"]["music_dirs"] audio_exts = ("flac", "mp3", "ogg") video_exts = ("mp4", "webm") _exts = audio_exts + video_exts for topdir in mdirs: for dirpath, dirnames, files in os.walk(topdir): for name in files: ext = name.lower().split(".")[-1] if ext in _exts: _file = os.path.join(dirpath, name) md = get_metadata_dict(_file) if not only_video and ext in audio_exts: for k, v in file_dict(_file, md, "audio").items(): if ( v and "_mobile" not in k and "_path" not in k and "slug" not in k ): if _file not in hits and ( q in v.lower() or q in _file.lower() ): hits.append(_file) elif not only_audio and ext in video_exts: for k, v in file_dict(_file, md, "video").items(): if ( v and "_mobile" not in k and "_path" not in k and "slug" not in k ): if _file not in hits and ( q in v.lower() or q in _file.lower() ): hits.append(_file) return hits def select_cover_art(path: str) -> str: if not app.fidiConfig["config"]["cover_art"]: return None cover_art = None image_exts = ("jpg", "jpeg", "png") images = [] if os.path.isfile(path): _dir = os.path.dirname(path) elif os.path.isdir(path): _dir = path for filename in os.listdir(_dir): for ext in image_exts: if filename.endswith(ext): images.append(filename) if len(images) == 1: cover_art = os.path.join(_dir, images[0]) for img in images: _img = img.lower() if "cover" in _img or "folder" in _img or "front" in _img or "cover" in _img: cover_art = os.path.join(_dir, img) break if cover_art: return url_for(".serve_file", path=cover_art.strip("/")) elif images: return url_for(".serve_file", path=os.path.join(_dir, images[0]).strip("/")) wrn("No cover art found for dir: " + path) def select_logo(config: dict, item: str, fakenow=None) -> str: # Don't spoil a user's custom settings if config["config"]["favicon_path"] != "/fidi.png": return config["config"]["favicon_path"] # Same here if config["config"]["logo_path"] != "/fidi.png": return config["config"]["logo_path"] if config["config"]["holidays"]: logo = config["config"]["logo_path"] if fakenow: now = fakenow else: now = datetime.datetime.now() for logo_date in LOGOS.keys(): if "-" in logo_date: sdate = logo_date.split("-") if len(sdate) == 2: month, date = sdate today_month, today_date = now.strftime("%m-%d").split("-") if month == today_month and date == today_date: dbg( "Activating holiday {item} for '{date}'!".format( item=item, date=logo_date ) ) logo = "/" + LOGOS[logo_date] elif ":" in logo_date: date_type, date = logo_date.split(":") if date_type == "day": if date == now.strftime("%d"): dbg( "Activating holiday {item} for '{date}'!".format( item=item, date=logo_date ) ) logo = "/" + LOGOS[logo_date] elif date_type == "month": if date == now.strftime("%m"): dbg( "Activating holiday {item} for '{date}'!".format( item=item, date=logo_date ) ) logo = "/" + LOGOS[logo_date] elif logo_date == "*": logo = "/" + LOGOS[logo_date] if debug: logo = "/static" + logo return logo else: # No holidays! if debug: return "/static" + config["config"][item] return config["config"][item] def select_css() -> tuple: if debug: ext = ".css" path = "/static/" else: ext = ".min.css" path = "/" try: theme = session["theme"] except KeyError: theme = app.fidiConfig["config"]["theme"] css = [ path + "css/normalize" + ext, path + "fa/css/fontawesome" + ext, path + "fa/css/solid" + ext, path + "css" + THEMES[theme] + ext, path + "css/fidi" + ext, ] if theme == "nes": css.append(path + "css/fidi-nes" + ext) return css, theme def title_slug(title: str, slug_limit=20) -> str: return "".join(thing for thing in title if thing.isalnum()).lower()[:slug_limit] app.fidiConfig = init() # Begin routes @app.errorhandler(404) def not_found(e): c = request_context(app.fidiConfig) c["code"] = 404 c["error_text"] = "The page you requested does not exist!" c["page_name"] = "404 Not Found" return (render_template("error.html", **c), c["code"]) @app.errorhandler(500) def internal_server_error(e): c = request_context(app.fidiConfig) c["code"] = 500 c[ "error_text" ] = "A programming error has occured. Check the application log for more information." c["page_name"] = "Internal Server Error" return (render_template("error.html", **c), c["code"]) @app.route("/") def index(): c = request_context(app.fidiConfig) c["page_name"] = "Welcome" c["plists"] = get_playlists(c["playlist_dir"]) return render_template("index.html", **c) @app.route("/about") def about(): c = request_context(app.fidiConfig) c["page_name"] = "About MousikóFídi" return render_template("about.html", **c) @app.route("/browse") def browse(): c = request_context(app.fidiConfig) c["dir_list"] = c["music_dirs"] c["page_name"] = "Media Dirs" c["plists"] = get_playlists(c["playlist_dir"]) c["top_link"] = True return render_template("dirs.html", **c) @app.route("/browse/The "Only Audio" and "Only Video" options cannot be used together!
""" ) elif q: hits = search_files(q, only_audio, only_video) if hits: for hit in hits: md = get_metadata_dict(hit) if is_audio_file(hit): _audio_list.append(file_dict(hit, md, "audio")) elif is_video_file(hit): _video_list.append(file_dict(hit, md, "video")) audio_list = make_unique_slugs(_audio_list) video_list = make_unique_slugs(_video_list) c["hits"] = hits c["audio_list"] = audio_list c["video_list"] = video_list c["only_audio"] = only_audio c["only_video"] = only_video c["q"] = q # TODO: The add all button here # c["add_all_button"] = True c["link_button"] = True c["top_link"] = True return render_template("search.html", **c) @app.route("/serve/