import base64 import datetime import hashlib import itertools import json import os import re import sqlite3 from collections import namedtuple from urllib import request, parse as urlparse import logbook import msgpack from Crypto import Random from Crypto.Cipher import AES from gi.repository import Gio, WebKit2 from werkzeug import parse_dict_header from .utils import config_path log = logbook.Logger('roland.extensions') class Extension: sort_order = 0 def __init__(self, roland): self.roland = roland self.name = self.__class__.__name__ def setup(self): """Setup method, for setting any state in the extension. If this is fatal, Roland will ignore the error. """ pass def before_run(self): """Very early setup method, happens during Roland.__init__. Should not typically be used.""" pass class ClipboardManager(Extension): def set_text(self, text): from gi.repository import Gdk, Gtk primary = Gtk.Clipboard.get(Gdk.SELECTION_PRIMARY) secondary = Gtk.Clipboard.get(Gdk.SELECTION_SECONDARY) clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) primary.set_text(text, -1) secondary.set_text(text, -1) clipboard.set_text(text, -1) class NotificationManager(Extension): # allow other things to notify sort_order = -1 def setup(self): from gi.repository import Notify if not Notify.is_initted(): Notify.init('roland') def notify(self, message, critical, header): from gi.repository import Notify n = Notify.Notification.new(header, message) logger = log.info if critical: logger = log.critical n.set_urgency(Notify.Urgency.CRITICAL) logger('{}: {}', header, message) n.show() class HistoryManager(Extension): def setup(self): self.create_history_db() def create_history_db(self): conn = self.get_history_db() cursor = conn.cursor() cursor.execute('create table if not exists history ' '(url text, view_count integer)') conn.commit() conn.close() def get_history_db(self): return sqlite3.connect(config_path('history.{}.db', self.roland.profile)) def update(self, url): if url == 'about:blank': return False conn = self.get_history_db() cursor = conn.cursor() cursor.execute('select url from history where url = ?', (url,)) rec = cursor.fetchone() if rec is None: cursor.execute('insert into history (url, view_count)' 'values (?, 1)', (url,)) else: cursor.execute('update history set view_count = view_count + 1 ' 'where url = ?', (url,)) conn.commit() conn.close() return False def most_popular_urls(self): conn = self.get_history_db() cursor = conn.cursor() cursor.execute('select url from history order by view_count desc limit 500') urls = [url for (url,) in cursor.fetchall()] conn.close() return urls class DownloadManager(Extension): save_location = os.path.expanduser('~/Downloads/') def setup(self): self.roland.downloads = {} context = WebKit2.WebContext.get_default() context.connect('download-started', self.download_started) def download_started(self, webcontext, download): download.connect('failed', self.failed) download.connect('finished', self.finished) download.connect('decide-destination', self.decide_destination) download.connect('created-destination', self.created_destination) def created_destination(self, download, destination): self.roland.notify("Downloading {}".format(destination)) def decide_destination(self, download, suggested_filename): save_path = os.path.join( self.save_location, suggested_filename) orig_save_path = save_path for i in itertools.count(1): if os.path.exists(save_path): save_path = orig_save_path + ('.%d' % i) else: break download.set_destination('file://' + save_path) self.roland.downloads[save_path] = download return True def failed(self, download, error): location = download.get_destination()[len('file://'):] if error == WebKit2.DownloadError.CANCELLED_BY_USER: self.roland.notify('Download cancelled: %s' % location) self.roland.downloads.pop(location) else: self.roland.notify('Download failed: %s' % location, critical=True) self.roland.downloads.pop(location) def finished(self, download): location = download.get_destination()[len('file://'):] self.roland.notify('Download finished: %s' % location) self.roland.downloads.pop(location) class CookieManager(Extension): def setup(self): cookiejar_path = config_path('cookies.{}.db', self.roland.profile) cookiejar = WebKit2.WebContext.get_default().get_cookie_manager() cookiejar.set_accept_policy(WebKit2.CookieAcceptPolicy.ALWAYS) cookiejar.set_persistent_storage( cookiejar_path, WebKit2.CookiePersistentStorage.SQLITE) class SessionManager(Extension): # Make SessionManager load up last, because it needs to do things after # TLSErrorByPassExtension has setup exclusions. sort_order = 1 def setup(self): try: with open(config_path('session.{}.json', self.roland.profile), 'r') as f: session = json.load(f) except FileNotFoundError: pass except Exception as e: self.roland.notify("Error loading session: {}".format(e)) else: lazy = getattr(self.roland.config, 'lazy_tabs', True) first = True for page in session: kwargs = { 'url': page['uri'], 'title': page.get('title'), 'session': page.get('session'), 'lazy': lazy if not first else False, } self.roland.new_window(**kwargs) first = False self.roland.connect('shutdown', self.on_shutdown) def on_shutdown(self, app): self.save_session() def save_session(self): session = [] for browser in self.roland.get_browsers(): if browser.lazy: single_session = browser.lazy_session uri = browser.lazy_uri else: single_session = browser.get_serialised_session_state() uri = browser.webview.get_uri() if uri in (None, 'about:blank', 'http:/'): continue session.append({ 'uri': uri, 'title': browser.get_title() or 'No Title', 'session': single_session, }) with open(config_path('session.{}.json', self.roland.profile), 'w') as f: json.dump(session, f, indent=4) class TLSErrorByPassExtension(Extension): def setup(self): cert_bypass_path = config_path( 'tls.{}/bypass/'.format(self.roland.profile)) try: os.makedirs(cert_bypass_path) except FileExistsError: pass try: os.makedirs(config_path('tls.{}/error/'.format(self.roland.profile))) except FileExistsError: pass context = WebKit2.WebContext.get_default() for host in os.listdir(cert_bypass_path): with open(os.path.join(cert_bypass_path, host)) as f: certificate = f.read() certificate = Gio.TlsCertificate.new_from_pem(certificate, len(certificate)) context.allow_tls_certificate_for_host(certificate, host) def bypass(self, host): cert_error_path = config_path( 'tls.{}/error/{}'.format(self.roland.profile, host)) cert_bypass_path = config_path( 'tls.{}/bypass/{}'.format(self.roland.profile, host)) context = WebKit2.WebContext.get_default() try: with open(cert_error_path) as f: certificate = f.read() except FileNotFoundError: pass else: with open(cert_bypass_path, 'w') as f: f.write(certificate) certificate = Gio.TlsCertificate.new_from_pem(certificate, len(certificate)) context.allow_tls_certificate_for_host(certificate, host) self.roland.notify("Certificate exclusion added for {}".format(host)) return client = Gio.SocketClient.new() # no validation at all, needed to download the bad certificate. client.set_tls_validation_flags(0) client.set_tls(True) def callback(client, task): connection = client.connect_finish(task) tls_connection = connection.get_base_io_stream() certificate = tls_connection.get_peer_certificate() context.allow_tls_certificate_for_host(certificate, host) with open(cert_bypass_path, 'w') as f: f.write(certificate.props.certificate_pem) self.roland.notify("Certificate exclusion added for {}".format(host)) client.connect_to_host_async(host, 443, None, callback) class HSTSExtension(Extension): def setup(self): self.create_hsts_db() def create_hsts_db(self): conn = self.get_hsts_db() cursor = conn.cursor() try: cursor.execute('create table hsts ' '(domain text unique, expiry timestamp)') except Exception: pass # already exists else: self.create_initial_db() conn.commit() conn.close() def create_initial_db(self): self.roland.notify("You don't have any HSTS entries. Downloading Chromium's preload list.") raw = request.urlopen('https://raw.githubusercontent.com/scheib/chromium/master/net/http/transport_security_state_static.json').read().decode('utf8') # JSON with comments? wild hsts = json.loads(re.sub(r'^ *?//.*$', '', raw, flags=re.MULTILINE)) entries = [] expiry = datetime.datetime.now() + datetime.timedelta(days=365) for entry in hsts['entries']: if entry.get('mode') == 'force-https': domain = entry['name'] if entry.get('include_subdomains'): domain = '.' + domain entries.append((domain, expiry)) with self.get_hsts_db() as conn: cursor = conn.cursor() cursor.executemany('insert into hsts (domain, expiry) ' 'values (?, ?)', entries) conn.commit() def get_hsts_db(self): return sqlite3.connect(config_path('hsts.{}.db', self.roland.profile), detect_types=sqlite3.PARSE_DECLTYPES) def add_entry(self, uri, hsts_header): parsed = parse_dict_header(hsts_header) max_age, *rest = parsed['max-age'].split(';', 1) include_subdomains = False if rest: include_subdomains = 'includesubdomains' in rest[0].lower() domain = urlparse.urlparse(uri).netloc if include_subdomains: domain = '.' + domain max_age = int(max_age) expiry = datetime.datetime.now() + datetime.timedelta(seconds=max_age) with self.get_hsts_db() as conn: cursor = conn.cursor() cursor.execute('insert or replace into hsts (domain, expiry) ' 'values (?, ?)', (domain, expiry)) conn.commit() def check_url(self, uri): domain = urlparse.urlparse(uri).netloc with self.get_hsts_db() as conn: cursor = conn.cursor() domains = [ domain, # straight domain match, e.g. lastpass.com ] if '.' in domain: subdomain, base = domain.split('.', 1) domains.extend([ '.' + base, # subdomain match, e.g. foo.keyerror.com '.' + domain # match base domain for a domain supportinb subdomains, e.g. keyerror.com ]) cursor.execute('select expiry from hsts ' 'where domain in ({})'.format(','.join('?' for i in domains)), domains) expiries = [expiry for (expiry,) in cursor.fetchall()] if expiries: expiry = expiries[0] if datetime.datetime.now() <= expiry: log.info("HSTS for {} is True", uri) return True log.info("HSTS for {} is False", uri) return False class UserContentManager(Extension): def setup(self): path = config_path('stylesheet.{}.css', self.roland.profile) try: with open(path) as f: stylesheet = f.read() except FileNotFoundError: stylesheet = '' path = config_path('script.{}.js', self.roland.profile) try: with open(path) as f: script = f.read() except FileNotFoundError: script = '' self.script = WebKit2.UserScript.new( script, WebKit2.UserContentInjectedFrames.ALL_FRAMES, WebKit2.UserScriptInjectionTime.END, None, None ) self.stylesheet = WebKit2.UserStyleSheet.new( stylesheet, WebKit2.UserContentInjectedFrames.ALL_FRAMES, WebKit2.UserStyleLevel.USER, None, None ) self.manager = WebKit2.UserContentManager.new() self.manager.add_script(self.script) self.manager.add_style_sheet(self.stylesheet) class DBusManager(Extension): def before_run(self): try: from dbus.mainloop.glib import DBusGMainLoop except ImportError: self.roland.notify('DBus is not available. Many large parts of roland will not work.', critical=True) else: DBusGMainLoop(set_as_default=True) def setup(self): self.create_dbus_api() def create_dbus_api(self): import dbus import dbus.service name = 'com.deschain.roland.{}'.format(self.roland.profile) roland = self.roland class DBusAPI(dbus.service.Object): def __init__(self): bus_name = dbus.service.BusName(name, bus=dbus.SessionBus()) dbus.service.Object.__init__(self, bus_name, '/com/deschain/roland/{}'.format(roland.profile)) @dbus.service.method(name) def open_window(self, url, related_id=None): # handle request from web extension if isinstance(url, bytes): url = url.decode('utf8') if related_id: related = roland.find_browser(page_id=related_id) webview = roland.new_webview(related=related.webview) webview.load_uri(url) roland.add_window(roland.browser_view.from_webview(webview, roland)) webview.emit('ready-to-show') else: # FIXME: this background should be configurable # This also forces new window follows to background roland.new_window(uri, background=True) return 1 @dbus.service.method(name) def enter_insert(self, page_id): window = roland.find_browser(page_id) if window: from roland.core import Mode window.set_mode(Mode.Insert) return 1 self.roland_api = DBusAPI() class PasswordManagerExtension(Extension): FormFill = namedtuple('FormFill', 'id last_used description domain form_data') BS = AES.block_size key = None def setup(self): self.create_password_db() def create_password_db(self): conn = self.get_password_db() cursor = conn.cursor() try: cursor.execute('create table managed ' '(id integer primary key, last_used timestamp, ' ' description text, domain text, data text)') except Exception: pass # already exists conn.commit() conn.close() def pad(self, s): return s + ((self.BS - len(s) % self.BS) * chr(self.BS - len(s) % self.BS)).encode('ascii') def unpad(self, s): return s[:-s[-1]] def encrypt(self, raw): assert isinstance(raw, bytes), "{!r} isn't a bytestring".format(raw) raw = self.pad(raw) iv = Random.new().read(AES.block_size) cipher = AES.new(self.key, AES.MODE_CBC, iv) return base64.b64encode(iv + cipher.encrypt(raw)) def decrypt(self, enc): assert isinstance(enc, bytes), "{!r} isn't a bytestring".format(enc) enc = base64.b64decode(enc) iv = enc[:self.BS] cipher = AES.new(self.key, AES.MODE_CBC, iv) return self.unpad(cipher.decrypt(enc[self.BS:])) def attempt_initialise(self, window): with self.get_password_db() as db: curs = db.cursor() curs.execute('select count(*) from managed') # at least one record has been created, i.e. a password exists if curs.fetchone()[0] > 0: return True while True: password = window.entry_line.blocking_prompt( prompt='Enter password (pick a good one)', private=True ) confirm = window.entry_line.blocking_prompt( prompt='Confirm password', private=True ) if password is None or confirm is None: return False elif password == confirm: self.key = hashlib.sha256(password.encode('utf8')).digest() domain = '!!frozen-brains-tell-no-tales!!' self.save_form(domain, {}) return True else: self.roland.notify("Password doesn't match confirmation") def unlock(self, window): if not self.attempt_initialise(window): raise ValueError('Database not initialised') if self.key is not None: return for i in range(3): password = window.entry_line.blocking_prompt( prompt='Master password', private=True ) if password is None: break try: self.test_password(password) except ValueError: self.roland.notify('Incorrect password') else: self.key = hashlib.sha256(password.encode('utf8')).digest() return raise ValueError('Could not unlock database') def test_password(self, password): self.key = hashlib.sha256(password.encode('utf8')).digest() try: if not self.get_for_domain(b'!!frozen-brains-tell-no-tales!!'): raise ValueError('Incorrect password') finally: self.key = None def get_for_domain(self, domain): assert isinstance(domain, bytes) with self.get_password_db() as db: curs = db.cursor() curs.execute('select id, last_used, description, domain, data from managed order by last_used desc') records = curs.fetchall() return [ self.FormFill(id, last_used, self.decrypt(encrypted_description), self.decrypt(encrypted_domain), msgpack.loads(self.decrypt(data))) for (id, last_used, encrypted_description, encrypted_domain, data) in records if self.decrypt(encrypted_domain) == domain ] def save_form(self, domain, form, description=None): if description is None: description = 'Form for {}'.format(domain) encrypted_description = self.encrypt(description.encode('utf8')) encrypted_domain = self.encrypt(domain.encode('utf8')) encrypted_form = self.encrypt(msgpack.dumps(form)) record = (datetime.datetime.now(), encrypted_description, encrypted_domain, encrypted_form) with self.get_password_db() as db: cursor = db.cursor() cursor.execute('insert into managed (last_used, description, domain, data) ' 'values (?, ?, ?, ?)', record) db.commit() def get_password_db(self): return sqlite3.connect(config_path('passwords.{}.db', self.roland.profile), detect_types=sqlite3.PARSE_DECLTYPES) def update_last_used(self, record_id): with self.get_password_db() as db: cursor = db.cursor() cursor.execute('update managed set last_used = ? where id = ?', (datetime.datetime.now(), record_id))