#!/usr/bin/python3 import subprocess import argparse import select import socket import struct import os import errno import shutil import re import sys import fcntl import signal from fnmatch import fnmatch # Good enough for now :/. # TODO(ish): # # Make assorted detection hacks cleaner. # Profile and optimize. # Consider reimplmenting in perl or C. # Produce more useful error messages :P. CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf' LOCKFILE = os.getenv('HOME')+'/.config/keyd/app.lock' LOGFILE = os.getenv('HOME')+'/.config/keyd/app.log' KEYD_BIN = os.environ.get('KEYD_BIN', 'keyd') debug_flag = os.getenv('KEYD_DEBUG') def dbg(s): if debug_flag: print(s) def die(msg): sys.stderr.write('ERROR: ') sys.stderr.write(msg) sys.stderr.write('\n') exit(0) def assert_env(var): if not os.getenv(var): raise Exception(f'Missing environment variable {var}') def assert_gnome(): if 'gnome' not in os.getenv('XDG_CURRENT_DESKTOP', '').lower() and \ not os.getenv('GNOME_SETUP_DISPLAY'): raise Exception(f'Gnome desktop environment not detected by inspecting' 'XDG_CURRENT_DESKTOP and GNOME_SETUP_DISPLAY environment variables') def run(cmd): return subprocess.check_output(['/bin/sh', '-c', cmd]).decode('utf8') def run_or_die(cmd, msg=''): rc = subprocess.run(['/bin/sh', '-c', cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode if rc != 0: die(msg) def parse_config(path): config = [] for line in open(path): line = line.strip() if line.startswith('[') and line.endswith(']'): a = line[1:-1].split('|') if len(a) < 2: cls = a[0] title = '*' else: cls = a[0] title = a[1] bindings = [] config.append((cls, title, bindings)) elif line == '': continue elif line.startswith('#'): continue else: bindings.append(line) return config def new_interruptible_generator(fd, event_fn, flushed_fn = None): intr, intw = os.pipe() def handler(s, _): os.write(intw, b'i') signal.signal(signal.SIGUSR1, handler) while True: r,_,_ = select.select([fd, intr], [], []) if intr in r: os.read(intr, 1) yield None if fd in r: if flushed_fn: while not flushed_fn(): yield event_fn() else: yield event_fn() class KDE(): def __init__(self, on_window_change): import os import dbus import dbus.mainloop.glib assert_env("KDE_SESSION_VERSION") self.on_window_change = on_window_change dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) # Inject the kwin script def init(self): import dbus if os.getenv('KDE_SESSION_VERSION') == '6': api = 'windowActivated' else: api = 'clientActivated' kwin_script = '''workspace.%s.connect(client => { if (!client) return; callDBus("org.rvaiya.keyd", "/listener", "", "updateWindow", client.caption, client.resourceClass, client.resourceName); }); ''' % api f = open(f'/tmp/keyd-kwin-{os.getuid()}.js', 'w') f.write(kwin_script) f.close() bus = dbus.SessionBus() kwin = KDE.get_kwin(bus) kwin.unloadScript(f.name) num = kwin.loadScript(f.name) if os.getenv('KDE_SESSION_VERSION') == '6': script_object = f'/Scripting/Script{num}' else: script_object = f'/{num}' script = bus.get_object('org.kde.KWin', script_object) script.run() @staticmethod def get_kwin(bus): import dbus import time # Give KDE time to initialize the dbus service # (allows use in autostart script) last_err = None for _ in range(5): try: return bus.get_object('org.kde.KWin', '/Scripting') except dbus.exceptions.DBusException as e: time.sleep(1) last_err = e if last_err is not None: raise last_err def run(self): import dbus.service import gi.repository.GLib on_window_change = self.on_window_change class Listener(dbus.service.Object): def __init__(self): super().__init__(dbus.service.BusName('org.rvaiya.keyd', dbus.SessionBus()), '/listener') @dbus.service.method('org.rvaiya.keyd') def updateWindow(self, title, klass, id): on_window_change(klass, title) Listener() gi.repository.GLib.MainLoop().run() # Just enough wayland wire protocol to listen for interesting events. # # Sadly most of the useful protocols haven't been standardized, # so this only works for wlroots based compositors :(. class Wayland(): def __init__(self, *interfaces): path = os.getenv("WAYLAND_DISPLAY") if path == None: raise Exception("WAYLAND_DISPLAY not set (is wayland running?)") if path[0] != '/': path = os.getenv("XDG_RUNTIME_DIR") + "/" + path self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect(path) self._bind_interfaces(interfaces) def send_msg(self, object_id, opcode, payload): opcode |= (len(payload)+8) << 16 self.sock.sendall(struct.pack(b'II', object_id, opcode)) self.sock.sendall(payload) def recv_msg(self): (object_id, evcode) = struct.unpack('II', self.sock.recv(8)) size = evcode >> 16 evcode = evcode & 0xFFFF data = self.sock.recv(size-8) return (object_id, evcode, data) def read_string(self, b): return b[4:4+struct.unpack('I', b[:4])[0]-1].decode('utf8') # Will bind the requested interface to object id 4 def _bind_interfaces(self, interfaces): # bind the registry object to id 2 self.send_msg(1, 1, b'\x02\x00\x00\x00') # solicit a sync message to bookend the registry events self.send_msg(1, 0, b'\x03\x00\x00\x00') interface_object_number = 4 while True: (obj, event, payload) = self.recv_msg() if obj == 2 and event == 0: # registry.global event wl_name = struct.unpack('I', payload[0:4])[0] wl_interface = self.read_string(payload[4:]) for interface in interfaces: if wl_interface == interface: bind_payload = payload + struct.pack('I', interface_object_number) self.send_msg(2, 0, bind_payload) setattr(self, interface, interface_object_number) interface_object_number += 1 if obj == 3: # sync message for interface in interfaces: if not hasattr(self, interface): raise Exception(f"Could not find interface {interface}") return class Wlroots(): def __init__(self, on_window_change): self.wl = Wayland('zwlr_foreign_toplevel_manager_v1') self.on_window_change = on_window_change def init(self): pass def run(self): windows = {} active_window = None for msg in new_interruptible_generator(self.wl.sock, self.wl.recv_msg): if msg == None: self.on_window_change(active_window['appid'], active_window['title']) continue (obj, event, payload) = msg if obj == self.wl.zwlr_foreign_toplevel_manager_v1 and event == 0: # zwlr_foreign_toplevel_manager_v1::toplevel event windows[struct.unpack('I', payload)[0]] = {'title': '', 'appid': ''} continue elif obj in windows: if event == 0: # zwlr_foreign_toplevel_handle_v1::title event windows[obj]['title'] = self.wl.read_string(payload) elif event == 1: # zwlr_foreign_toplevel_handle_v1::app_id event windows[obj]['appid'] = self.wl.read_string(payload) elif event == 4: # zwlr_foreign_toplevel_handle_v1::state event if active_window == windows[obj]: active_window = None window_is_active = False array_size = struct.unpack('I', payload[0:4])[0] for i in range(0, array_size, 4): start_offset = i + 4 end_offset = start_offset + 4 state = struct.unpack('I', payload[start_offset:end_offset])[0] # zwlr_foreign_toplevel_handle_v1::state enum -> activated if state == 2: window_is_active = True if window_is_active: active_window = windows[obj] elif event == 5 and active_window == windows[obj]: # zwlr_foreign_toplevel_handle_v1::done event self.on_window_change(active_window['appid'], active_window['title']) elif event == 6: # zwlr_foreign_toplevel_handle_v1::closed event closed_window = windows.pop(obj) if closed_window == active_window: active_window = None class Cosmic(): def __init__(self, on_window_change): self.wl = Wayland('zcosmic_toplevel_info_v1') self.on_window_change = on_window_change def init(self): pass def run(self): windows = {} while True: (obj, event, payload) = self.wl.recv_msg() if obj not in windows: windows[obj]={} if event == 2: windows[obj]['title'] = self.wl.read_string(payload) if event == 3: windows[obj]['appid'] = self.wl.read_string(payload) if event == 8 and payload[0] > 0 and payload[4] == 2: self.on_window_change(windows[obj].get('appid', ''), windows[obj].get('title', '')) class XMonitor(): def __init__(self, on_window_change): assert_env('DISPLAY') self.on_window_change = on_window_change def init(self): import Xlib import Xlib.display self.dpy = Xlib.display.Display() self.dpy.screen().root.change_attributes( event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask) self._NET_WM_NAME = self.dpy.intern_atom('_NET_WM_NAME') self.WM_NAME = self.dpy.intern_atom('WM_NAME') def get_window_info(self, win): def get_title(win): title = '' try: title = win.get_full_property(self._NET_WM_NAME, 0).value.decode('utf8') except: try: title = win.get_full_property(self.WM_NAME, 0).value.decode('latin1', 'replace') except: pass return title while win: cls = win.get_wm_class() if cls: return (cls[1], get_title(win)) win = win.query_tree().parent return ("root", "") def run(self): import Xlib last_active_class = "" last_active_title = "" _NET_WM_STATE = self.dpy.intern_atom('_NET_WM_STATE', False) _NET_WM_STATE_ABOVE = self.dpy.intern_atom('_NET_WM_STATE_ABOVE', False) _NET_WM_WINDOW_TYPE_NOTIFICATION = self.dpy.intern_atom('_NET_WM_WINDOW_TYPE_NOTIFICATION', False) _NET_WM_WINDOW_TYPE = self.dpy.intern_atom('_NET_WM_WINDOW_TYPE', False) def get_floating_window(): q = [self.dpy.screen().root] while q: w = q.pop() q.extend(w.query_tree().children) v = w.get_full_property(_NET_WM_STATE, Xlib.Xatom.ATOM) if v and v.value and v.value[0] == _NET_WM_STATE_ABOVE: types = w.get_full_property(_NET_WM_WINDOW_TYPE, Xlib.Xatom.ATOM) # Ignore persistent notification windows like dunst if not types or _NET_WM_WINDOW_TYPE_NOTIFICATION not in types.value: return w return None def get_active_window(): win = get_floating_window() if win != None: return win return self.dpy.get_input_focus().focus for ev in new_interruptible_generator(self.dpy.fileno(), self.dpy.next_event, lambda: not self.dpy.pending_events()): if ev == None: self.on_window_change(last_active_class, last_active_title) else: try: win = get_active_window() if isinstance(win, int) or win == None: continue win.change_attributes(event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask) cls, title = self.get_window_info(win) if cls != last_active_class or title != last_active_title: last_active_class = cls last_active_title = title self.on_window_change(cls, title) except: pass # :( class GnomeMonitor(): def __init__(self, on_window_change): assert_gnome() self.on_window_change = on_window_change self.fifo_path = (os.getenv('XDG_RUNTIME_DIR') or '/run/user/'+str(os.getuid())) + '/keyd.fifo' def init(self): if not os.path.exists(self.fifo_path): print("""Gnome extension doesn't appear to be running: You will need to install the keyd gnome extension in order for the application mapper to work correctly. This can usually be achieved by running: rm -r ~/.local/share/gnome-shell/extensions/keyd # Remove any older versions of the extension mkdir -p ~/.local/share/gnome-shell/extensions Followed by: Gnome 42-44: ln -s /usr/local/share/keyd/gnome-extension ~/.local/share/gnome-shell/extensions/keyd Gnome 45/46: ln -s /usr/local/share/keyd/gnome-extension-45 ~/.local/share/gnome-shell/extensions/keyd Finally restart Gnome and run: gnome-extensions enable keyd gnome-extensions show keyd (verify the extension is enabled) NOTE: You may need to adjust the above paths (e.g /usr/share/keyd/gnome-extension) depending on your distro. """) exit(0) def run(self): fh = open(self.fifo_path) last_cls = '' last_title = '' for line in new_interruptible_generator(fh.fileno(), fh.readline, None): if line == None: self.on_window_change(last_cls, last_title) continue try: (cls, title) = line.strip('\n').split('\t') last_cls = cls last_title = title except: cls = '' title = '' self.on_window_change(cls, title) def get_monitor(on_window_change): monitors = [ ('kde', KDE), ('wlroots', Wlroots), ('cosmic', Cosmic), ('Gnome', GnomeMonitor), ('X', XMonitor), ] for name, mon in monitors: try: m = mon(on_window_change) print(f'{name} detected') return m except: pass print('Could not detect app environment :(.') sys.exit(-1) def lock(): global lockfh lockfh = open(LOCKFILE, 'w') try: fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB) except: die('only one instance may run at a time') def daemonize(): print(f'Daemonizing, log output will be stored in {LOGFILE}...') fh = open(LOGFILE, 'w') os.close(1) os.close(2) os.dup2(fh.fileno(), 1) os.dup2(fh.fileno(), 2) if os.fork(): exit(0) if os.fork(): exit(0) opt = argparse.ArgumentParser() opt.add_argument('-v', '--verbose', default=False, action='store_true', help='Log the active window (useful for discovering window and class names)') opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background') args = opt.parse_args() if not os.path.exists(CONFIG_PATH): die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf') config = parse_config(CONFIG_PATH) lock() def lookup_bindings(cls, title): bindings = [] for cexp, texp, b in config: if fnmatch(cls, cexp) and fnmatch(title, texp): dbg(f'\tMatched {cexp}|{texp}') bindings.extend(b) return bindings def normalize_class(s): return re.sub('[^A-Za-z0-9]+', '-', s).strip('-').lower() def normalize_title(s): return re.sub(r'[\W_]+', '-', s).strip('-').lower() last_mtime = os.path.getmtime(CONFIG_PATH) def on_window_change(cls, title): global last_mtime global config cls = normalize_class(cls) title = normalize_title(title) mtime = os.path.getmtime(CONFIG_PATH) if mtime != last_mtime: print(CONFIG_PATH + ': Updated, reloading config...') config = parse_config(CONFIG_PATH) last_mtime = mtime if args.verbose: print(f'Active window: {cls}|{title}') bindings = lookup_bindings(cls, title) subprocess.run([KEYD_BIN, 'bind', 'reset', *bindings], stdout=subprocess.DEVNULL) mon = get_monitor(on_window_change) mon.init() if args.daemonize: daemonize() mon.run()