M src/pushbroom/console.py => src/pushbroom/console.py +31 -24
@@ 1,3 1,6 @@
+"""
+Pushbroom entry point
+"""
import argparse
import configparser
import fnmatch
@@ 5,10 8,13 @@ import logging
import os
import re
import sys
+from pathlib import Path
from pushbroom import sweep, __version__
+
def run():
+ """Main entry point"""
args = parse_args()
setup_logging(args)
config = read_config(args.config)
@@ 20,8 26,8 @@ def pushbroom(config, dry_run=False):
logging.info("Starting pushbroom")
for section in config.sections():
path = config.get(section, "path")
- fullpath = os.path.abspath(os.path.expanduser(path))
- if not os.path.isdir(fullpath):
+ fullpath = Path(path).expanduser().absolute()
+ if not fullpath.is_dir():
logging.error("No such directory: %s", fullpath)
else:
num_days = config.getint(section, "numdays")
@@ 37,8 43,8 @@ def pushbroom(config, dry_run=False):
logging.warning("Ignoring 'Shred' option while 'Trash' is set")
shred = False
- trash = os.path.abspath(os.path.expanduser(trash))
- if not os.path.isdir(trash):
+ trash = Path(trash).expanduser().absolute()
+ if not trash.is_dir():
logging.error("No such directory %s", trash)
sweep(
@@ 71,46 77,47 @@ def setup_logging(args):
"""Set up logging"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
- ch = logging.StreamHandler()
- ch.setFormatter(logging.Formatter("%(message)s"))
- ch.setLevel(logging.ERROR)
+ stream_handler = logging.StreamHandler()
+ stream_handler.setFormatter(logging.Formatter("%(message)s"))
+ stream_handler.setLevel(logging.ERROR)
if not args.dry_run:
# If not doing a dry run log, to a file
- log_file = os.path.join(
- os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache")),
- "pushbroom",
- "pushbroom.log",
+ log_file = (
+ Path(os.environ.get("XDG_CACHE_HOME", Path("~/.cache").expanduser()))
+ .joinpath("pushbroom")
+ .joinpath("pushbroom.log")
)
- os.makedirs(os.path.dirname(log_file), exist_ok=True)
- fh = logging.FileHandler(log_file)
- fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
- fh.setLevel(logging.INFO)
- logger.addHandler(fh)
+ log_file.parent.mkdir(parents=True, exist_ok=True)
+ file_handler = logging.FileHandler(log_file)
+ fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
+ file_handler.setFormatter(fmt)
+ file_handler.setLevel(logging.INFO)
+ logger.addHandler(file_handler)
if args.verbose or args.dry_run:
# If verbose or doing a dry run print info to console
- ch.setLevel(logging.INFO)
+ stream_handler.setLevel(logging.INFO)
- logger.addHandler(ch)
+ logger.addHandler(stream_handler)
def read_config(conf_file=None):
"""Find and read configuration file"""
if not conf_file:
# Look under XDG_CONFIG_HOME first, then look for ~/.pushbroomrc
- conf_file = os.path.join(
- os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
- "pushbroom",
- "config",
+ conf_file = (
+ Path(os.environ.get("XDG_CONFIG_HOME", Path("~/.config").expanduser()))
+ .joinpath("pushbroom")
+ .joinpath("config")
)
if not os.path.exists(conf_file):
conf_file = os.path.expanduser("~/.pushbroomrc")
config = configparser.ConfigParser()
try:
- with open(conf_file, "r") as f:
- config.read_file(f)
+ with open(conf_file, "r") as fil:
+ config.read_file(fil)
except FileNotFoundError:
logging.error("Configuration file %s not found", conf_file)
sys.exit(1)
M src/pushbroom/sweep.py => src/pushbroom/sweep.py +14 -9
@@ 2,18 2,23 @@ import logging
import os
import re
import time
+from pathlib import Path
SECONDS_PER_DAY = 24 * 60 * 60
def delete(path, shred):
+ """Delete the file at the given path.
+
+ If ``shred`` is True, first write over the file with random data before deleting.
+ """
if shred:
- with open(path, "ba+") as f:
- length = f.tell()
- f.seek(0)
- f.write(os.urandom(length))
+ with open(path, "ba+") as fil:
+ length = fil.tell()
+ fil.seek(0)
+ fil.write(os.urandom(length))
- os.remove(path)
+ Path(path).unlink()
def sweep(name, path, num_days, ignore, match, trash, dry_run, shred):
@@ 37,17 42,17 @@ def sweep(name, path, num_days, ignore, match, trash, dry_run, shred):
dirs[:] = [d for d in dirs if re.match(match, d) and not re.match(ignore, d)]
files = [f for f in files if re.match(match, f) and not re.match(ignore, f)]
for file in files:
- fpath = os.path.join(root, file)
- if not os.path.exists(fpath):
+ fpath = Path(root).joinpath(file)
+ if not fpath.exists():
continue
- if os.stat(fpath).st_mtime >= thresh:
+ if fpath.stat().st_mtime >= thresh:
continue
if trash:
logging.info("Moving %s to %s", fpath, trash)
if not dry_run:
- os.rename(fpath, os.path.join(trash, file))
+ fpath.rename(Path(trash).joinpath(fpath.name))
else:
if shred:
logging.info("Securely deleting %s", fpath)