589 lines
18 KiB
Python
Executable file
589 lines
18 KiB
Python
Executable file
#!/usr/bin/python3
|
|
|
|
import subprocess
|
|
import argparse
|
|
import select
|
|
import socket
|
|
import struct
|
|
import os
|
|
import errno
|
|
import shutil
|
|
import re
|
|
import sys
|
|
import fcntl
|
|
import signal
|
|
from fnmatch import fnmatch
|
|
|
|
# Good enough for now :/.
|
|
|
|
# TODO(ish):
|
|
#
|
|
# Make assorted detection hacks cleaner.
|
|
# Profile and optimize.
|
|
# Consider reimplmenting in perl or C.
|
|
# Produce more useful error messages :P.
|
|
|
|
CONFIG_PATH = os.getenv('HOME')+'/.config/keyd/app.conf'
|
|
LOCKFILE = os.getenv('HOME')+'/.config/keyd/app.lock'
|
|
LOGFILE = os.getenv('HOME')+'/.config/keyd/app.log'
|
|
|
|
KEYD_BIN = os.environ.get('KEYD_BIN', 'keyd')
|
|
|
|
debug_flag = os.getenv('KEYD_DEBUG')
|
|
|
|
def dbg(s):
|
|
if debug_flag:
|
|
print(s)
|
|
|
|
def die(msg):
|
|
sys.stderr.write('ERROR: ')
|
|
sys.stderr.write(msg)
|
|
sys.stderr.write('\n')
|
|
exit(0)
|
|
|
|
def assert_env(var):
|
|
if not os.getenv(var):
|
|
raise Exception(f'Missing environment variable {var}')
|
|
|
|
def assert_gnome():
|
|
if 'gnome' not in os.getenv('XDG_CURRENT_DESKTOP', '').lower() and \
|
|
not os.getenv('GNOME_SETUP_DISPLAY'):
|
|
raise Exception(f'Gnome desktop environment not detected by inspecting'
|
|
'XDG_CURRENT_DESKTOP and GNOME_SETUP_DISPLAY environment variables')
|
|
|
|
def run(cmd):
|
|
return subprocess.check_output(['/bin/sh', '-c', cmd]).decode('utf8')
|
|
|
|
def run_or_die(cmd, msg=''):
|
|
rc = subprocess.run(['/bin/sh', '-c', cmd],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL).returncode
|
|
|
|
if rc != 0:
|
|
die(msg)
|
|
|
|
def parse_config(path):
|
|
config = []
|
|
|
|
for line in open(path):
|
|
line = line.strip()
|
|
|
|
if line.startswith('[') and line.endswith(']'):
|
|
a = line[1:-1].split('|')
|
|
|
|
if len(a) < 2:
|
|
cls = a[0]
|
|
title = '*'
|
|
else:
|
|
cls = a[0]
|
|
title = a[1]
|
|
|
|
bindings = []
|
|
config.append((cls, title, bindings))
|
|
elif line == '':
|
|
continue
|
|
elif line.startswith('#'):
|
|
continue
|
|
else:
|
|
bindings.append(line)
|
|
|
|
return config
|
|
|
|
def new_interruptible_generator(fd, event_fn, flushed_fn = None):
|
|
intr, intw = os.pipe()
|
|
|
|
def handler(s, _):
|
|
os.write(intw, b'i')
|
|
|
|
signal.signal(signal.SIGUSR1, handler)
|
|
|
|
while True:
|
|
r,_,_ = select.select([fd, intr], [], [])
|
|
|
|
if intr in r:
|
|
os.read(intr, 1)
|
|
yield None
|
|
if fd in r:
|
|
if flushed_fn:
|
|
while not flushed_fn():
|
|
yield event_fn()
|
|
else:
|
|
yield event_fn()
|
|
|
|
class KDE():
|
|
def __init__(self, on_window_change):
|
|
import os
|
|
import dbus
|
|
import dbus.mainloop.glib
|
|
|
|
assert_env("KDE_SESSION_VERSION")
|
|
|
|
self.on_window_change = on_window_change
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
# Inject the kwin script
|
|
def init(self):
|
|
import dbus
|
|
|
|
if os.getenv('KDE_SESSION_VERSION') == '6':
|
|
api = 'windowActivated'
|
|
else:
|
|
api = 'clientActivated'
|
|
|
|
kwin_script = '''workspace.%s.connect(client => {
|
|
if (!client) return;
|
|
callDBus("org.rvaiya.keyd", "/listener", "", "updateWindow", client.caption, client.resourceClass, client.resourceName);
|
|
});
|
|
''' % api
|
|
|
|
f = open(f'/tmp/keyd-kwin-{os.getuid()}.js', 'w')
|
|
f.write(kwin_script)
|
|
f.close()
|
|
|
|
bus = dbus.SessionBus()
|
|
|
|
kwin = KDE.get_kwin(bus)
|
|
|
|
kwin.unloadScript(f.name)
|
|
num = kwin.loadScript(f.name)
|
|
|
|
if os.getenv('KDE_SESSION_VERSION') == '6':
|
|
script_object = f'/Scripting/Script{num}'
|
|
else:
|
|
script_object = f'/{num}'
|
|
|
|
script = bus.get_object('org.kde.KWin', script_object)
|
|
script.run()
|
|
|
|
@staticmethod
|
|
def get_kwin(bus):
|
|
import dbus
|
|
import time
|
|
|
|
# Give KDE time to initialize the dbus service
|
|
# (allows use in autostart script)
|
|
last_err = None
|
|
for _ in range(5):
|
|
try:
|
|
return bus.get_object('org.kde.KWin', '/Scripting')
|
|
except dbus.exceptions.DBusException as e:
|
|
time.sleep(1)
|
|
last_err = e
|
|
|
|
if last_err is not None:
|
|
raise last_err
|
|
|
|
def run(self):
|
|
import dbus.service
|
|
import gi.repository.GLib
|
|
|
|
on_window_change = self.on_window_change
|
|
class Listener(dbus.service.Object):
|
|
def __init__(self):
|
|
super().__init__(dbus.service.BusName('org.rvaiya.keyd', dbus.SessionBus()), '/listener')
|
|
|
|
@dbus.service.method('org.rvaiya.keyd')
|
|
def updateWindow(self, title, klass, id):
|
|
on_window_change(klass, title)
|
|
|
|
Listener()
|
|
|
|
gi.repository.GLib.MainLoop().run()
|
|
|
|
# Just enough wayland wire protocol to listen for interesting events.
|
|
#
|
|
# Sadly most of the useful protocols haven't been standardized,
|
|
# so this only works for wlroots based compositors :(.
|
|
|
|
class Wayland():
|
|
def __init__(self, *interfaces):
|
|
path = os.getenv("WAYLAND_DISPLAY")
|
|
if path == None:
|
|
raise Exception("WAYLAND_DISPLAY not set (is wayland running?)")
|
|
|
|
if path[0] != '/':
|
|
path = os.getenv("XDG_RUNTIME_DIR") + "/" + path
|
|
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.sock.connect(path)
|
|
|
|
self._bind_interfaces(interfaces)
|
|
|
|
def send_msg(self, object_id, opcode, payload):
|
|
opcode |= (len(payload)+8) << 16
|
|
|
|
self.sock.sendall(struct.pack(b'II', object_id, opcode))
|
|
self.sock.sendall(payload)
|
|
|
|
def recv_msg(self):
|
|
(object_id, evcode) = struct.unpack('II', self.sock.recv(8))
|
|
|
|
size = evcode >> 16
|
|
evcode = evcode & 0xFFFF
|
|
|
|
data = self.sock.recv(size-8)
|
|
|
|
return (object_id, evcode, data)
|
|
|
|
def read_string(self, b):
|
|
return b[4:4+struct.unpack('I', b[:4])[0]-1].decode('utf8')
|
|
|
|
# Will bind the requested interface to object id 4
|
|
def _bind_interfaces(self, interfaces):
|
|
# bind the registry object to id 2
|
|
self.send_msg(1, 1, b'\x02\x00\x00\x00')
|
|
# solicit a sync message to bookend the registry events
|
|
self.send_msg(1, 0, b'\x03\x00\x00\x00')
|
|
|
|
interface_object_number = 4
|
|
while True:
|
|
(obj, event, payload) = self.recv_msg()
|
|
if obj == 2 and event == 0: # registry.global event
|
|
wl_name = struct.unpack('I', payload[0:4])[0]
|
|
wl_interface = self.read_string(payload[4:])
|
|
|
|
for interface in interfaces:
|
|
if wl_interface == interface:
|
|
bind_payload = payload + struct.pack('I', interface_object_number)
|
|
self.send_msg(2, 0, bind_payload)
|
|
setattr(self, interface, interface_object_number)
|
|
interface_object_number += 1
|
|
|
|
if obj == 3: # sync message
|
|
for interface in interfaces:
|
|
if not hasattr(self, interface):
|
|
raise Exception(f"Could not find interface {interface}")
|
|
return
|
|
|
|
|
|
class Wlroots():
|
|
def __init__(self, on_window_change):
|
|
self.wl = Wayland('zwlr_foreign_toplevel_manager_v1')
|
|
self.on_window_change = on_window_change
|
|
|
|
def init(self):
|
|
pass
|
|
|
|
def run(self):
|
|
windows = {}
|
|
active_window = None
|
|
|
|
for msg in new_interruptible_generator(self.wl.sock, self.wl.recv_msg):
|
|
if msg == None:
|
|
self.on_window_change(active_window['appid'], active_window['title'])
|
|
continue
|
|
|
|
(obj, event, payload) = msg
|
|
if obj == self.wl.zwlr_foreign_toplevel_manager_v1 and event == 0:
|
|
# zwlr_foreign_toplevel_manager_v1::toplevel event
|
|
windows[struct.unpack('I', payload)[0]] = {'title': '', 'appid': ''}
|
|
continue
|
|
elif obj in windows:
|
|
if event == 0:
|
|
# zwlr_foreign_toplevel_handle_v1::title event
|
|
windows[obj]['title'] = self.wl.read_string(payload)
|
|
elif event == 1:
|
|
# zwlr_foreign_toplevel_handle_v1::app_id event
|
|
windows[obj]['appid'] = self.wl.read_string(payload)
|
|
elif event == 4:
|
|
# zwlr_foreign_toplevel_handle_v1::state event
|
|
if active_window == windows[obj]:
|
|
active_window = None
|
|
window_is_active = False
|
|
|
|
array_size = struct.unpack('I', payload[0:4])[0]
|
|
for i in range(0, array_size, 4):
|
|
start_offset = i + 4
|
|
end_offset = start_offset + 4
|
|
state = struct.unpack('I', payload[start_offset:end_offset])[0]
|
|
# zwlr_foreign_toplevel_handle_v1::state enum -> activated
|
|
if state == 2:
|
|
window_is_active = True
|
|
|
|
if window_is_active:
|
|
active_window = windows[obj]
|
|
elif event == 5 and active_window == windows[obj]:
|
|
# zwlr_foreign_toplevel_handle_v1::done event
|
|
self.on_window_change(active_window['appid'], active_window['title'])
|
|
elif event == 6:
|
|
# zwlr_foreign_toplevel_handle_v1::closed event
|
|
closed_window = windows.pop(obj)
|
|
if closed_window == active_window:
|
|
active_window = None
|
|
|
|
class Cosmic():
|
|
def __init__(self, on_window_change):
|
|
self.wl = Wayland('zcosmic_toplevel_info_v1')
|
|
self.on_window_change = on_window_change
|
|
|
|
def init(self):
|
|
pass
|
|
|
|
def run(self):
|
|
windows = {}
|
|
while True:
|
|
(obj, event, payload) = self.wl.recv_msg()
|
|
if obj not in windows:
|
|
windows[obj]={}
|
|
|
|
if event == 2:
|
|
windows[obj]['title'] = self.wl.read_string(payload)
|
|
if event == 3:
|
|
windows[obj]['appid'] = self.wl.read_string(payload)
|
|
if event == 8 and payload[0] > 0 and payload[4] == 2:
|
|
self.on_window_change(windows[obj].get('appid', ''), windows[obj].get('title', ''))
|
|
|
|
class XMonitor():
|
|
def __init__(self, on_window_change):
|
|
assert_env('DISPLAY')
|
|
|
|
self.on_window_change = on_window_change
|
|
|
|
def init(self):
|
|
import Xlib
|
|
import Xlib.display
|
|
|
|
self.dpy = Xlib.display.Display()
|
|
self.dpy.screen().root.change_attributes(
|
|
event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask)
|
|
|
|
self._NET_WM_NAME = self.dpy.intern_atom('_NET_WM_NAME')
|
|
self.WM_NAME = self.dpy.intern_atom('WM_NAME')
|
|
|
|
|
|
def get_window_info(self, win):
|
|
def get_title(win):
|
|
title = ''
|
|
try:
|
|
title = win.get_full_property(self._NET_WM_NAME, 0).value.decode('utf8')
|
|
except:
|
|
try:
|
|
title = win.get_full_property(self.WM_NAME, 0).value.decode('latin1', 'replace')
|
|
except:
|
|
pass
|
|
|
|
return title
|
|
|
|
while win:
|
|
cls = win.get_wm_class()
|
|
if cls:
|
|
return (cls[1], get_title(win))
|
|
|
|
win = win.query_tree().parent
|
|
|
|
return ("root", "")
|
|
|
|
def run(self):
|
|
import Xlib
|
|
|
|
last_active_class = ""
|
|
last_active_title = ""
|
|
|
|
_NET_WM_STATE = self.dpy.intern_atom('_NET_WM_STATE', False)
|
|
_NET_WM_STATE_ABOVE = self.dpy.intern_atom('_NET_WM_STATE_ABOVE', False)
|
|
_NET_WM_WINDOW_TYPE_NOTIFICATION = self.dpy.intern_atom('_NET_WM_WINDOW_TYPE_NOTIFICATION', False)
|
|
_NET_WM_WINDOW_TYPE = self.dpy.intern_atom('_NET_WM_WINDOW_TYPE', False)
|
|
|
|
def get_floating_window():
|
|
q = [self.dpy.screen().root]
|
|
while q:
|
|
w = q.pop()
|
|
q.extend(w.query_tree().children)
|
|
|
|
v = w.get_full_property(_NET_WM_STATE, Xlib.Xatom.ATOM)
|
|
|
|
if v and v.value and v.value[0] == _NET_WM_STATE_ABOVE:
|
|
types = w.get_full_property(_NET_WM_WINDOW_TYPE, Xlib.Xatom.ATOM)
|
|
|
|
# Ignore persistent notification windows like dunst
|
|
if not types or _NET_WM_WINDOW_TYPE_NOTIFICATION not in types.value:
|
|
return w
|
|
|
|
return None
|
|
|
|
def get_active_window():
|
|
win = get_floating_window()
|
|
if win != None:
|
|
return win
|
|
|
|
return self.dpy.get_input_focus().focus
|
|
|
|
for ev in new_interruptible_generator(self.dpy.fileno(), self.dpy.next_event, lambda: not self.dpy.pending_events()):
|
|
if ev == None:
|
|
self.on_window_change(last_active_class, last_active_title)
|
|
else:
|
|
try:
|
|
win = get_active_window()
|
|
|
|
if isinstance(win, int) or win == None:
|
|
continue
|
|
|
|
win.change_attributes(event_mask = Xlib.X.SubstructureNotifyMask|Xlib.X.PropertyChangeMask)
|
|
|
|
cls, title = self.get_window_info(win)
|
|
|
|
if cls != last_active_class or title != last_active_title:
|
|
last_active_class = cls
|
|
last_active_title = title
|
|
|
|
self.on_window_change(cls, title)
|
|
except:
|
|
pass
|
|
|
|
|
|
# :(
|
|
class GnomeMonitor():
|
|
def __init__(self, on_window_change):
|
|
assert_gnome()
|
|
|
|
self.on_window_change = on_window_change
|
|
|
|
self.fifo_path = (os.getenv('XDG_RUNTIME_DIR') or '/run/user/'+str(os.getuid())) + '/keyd.fifo'
|
|
|
|
def init(self):
|
|
if not os.path.exists(self.fifo_path):
|
|
print("""Gnome extension doesn't appear to be running:
|
|
|
|
You will need to install the keyd gnome extension in order for the
|
|
application mapper to work correctly.
|
|
|
|
This can usually be achieved by running:
|
|
|
|
rm -r ~/.local/share/gnome-shell/extensions/keyd # Remove any older versions of the extension
|
|
mkdir -p ~/.local/share/gnome-shell/extensions
|
|
|
|
Followed by:
|
|
|
|
Gnome 42-44:
|
|
ln -s /usr/local/share/keyd/gnome-extension ~/.local/share/gnome-shell/extensions/keyd
|
|
|
|
Gnome 45/46:
|
|
ln -s /usr/local/share/keyd/gnome-extension-45 ~/.local/share/gnome-shell/extensions/keyd
|
|
|
|
Finally restart Gnome and run:
|
|
|
|
gnome-extensions enable keyd
|
|
gnome-extensions show keyd (verify the extension is enabled)
|
|
|
|
NOTE:
|
|
You may need to adjust the above paths (e.g /usr/share/keyd/gnome-extension)
|
|
depending on your distro.
|
|
""")
|
|
exit(0)
|
|
|
|
def run(self):
|
|
fh = open(self.fifo_path)
|
|
last_cls = ''
|
|
last_title = ''
|
|
|
|
for line in new_interruptible_generator(fh.fileno(), fh.readline, None):
|
|
if line == None:
|
|
self.on_window_change(last_cls, last_title)
|
|
continue
|
|
|
|
try:
|
|
(cls, title) = line.strip('\n').split('\t')
|
|
last_cls = cls
|
|
last_title = title
|
|
except:
|
|
cls = ''
|
|
title = ''
|
|
|
|
self.on_window_change(cls, title)
|
|
|
|
def get_monitor(on_window_change):
|
|
monitors = [
|
|
('kde', KDE),
|
|
('wlroots', Wlroots),
|
|
('cosmic', Cosmic),
|
|
('Gnome', GnomeMonitor),
|
|
('X', XMonitor),
|
|
]
|
|
|
|
for name, mon in monitors:
|
|
try:
|
|
m = mon(on_window_change)
|
|
print(f'{name} detected')
|
|
return m
|
|
except:
|
|
pass
|
|
|
|
print('Could not detect app environment :(.')
|
|
sys.exit(-1)
|
|
|
|
def lock():
|
|
global lockfh
|
|
lockfh = open(LOCKFILE, 'w')
|
|
try:
|
|
fcntl.flock(lockfh, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except:
|
|
die('only one instance may run at a time')
|
|
|
|
def daemonize():
|
|
print(f'Daemonizing, log output will be stored in {LOGFILE}...')
|
|
|
|
fh = open(LOGFILE, 'w')
|
|
|
|
os.close(1)
|
|
os.close(2)
|
|
os.dup2(fh.fileno(), 1)
|
|
os.dup2(fh.fileno(), 2)
|
|
|
|
if os.fork(): exit(0)
|
|
if os.fork(): exit(0)
|
|
|
|
opt = argparse.ArgumentParser()
|
|
opt.add_argument('-v', '--verbose', default=False, action='store_true', help='Log the active window (useful for discovering window and class names)')
|
|
opt.add_argument('-d', '--daemonize', default=False, action='store_true', help='fork and run in the background')
|
|
args = opt.parse_args()
|
|
|
|
if not os.path.exists(CONFIG_PATH):
|
|
die('could not find app.conf, make sure it is in ~/.config/keyd/app.conf')
|
|
|
|
config = parse_config(CONFIG_PATH)
|
|
lock()
|
|
|
|
def lookup_bindings(cls, title):
|
|
bindings = []
|
|
for cexp, texp, b in config:
|
|
if fnmatch(cls, cexp) and fnmatch(title, texp):
|
|
dbg(f'\tMatched {cexp}|{texp}')
|
|
bindings.extend(b)
|
|
|
|
return bindings
|
|
|
|
def normalize_class(s):
|
|
return re.sub('[^A-Za-z0-9]+', '-', s).strip('-').lower()
|
|
|
|
def normalize_title(s):
|
|
return re.sub(r'[\W_]+', '-', s).strip('-').lower()
|
|
|
|
last_mtime = os.path.getmtime(CONFIG_PATH)
|
|
def on_window_change(cls, title):
|
|
global last_mtime
|
|
global config
|
|
|
|
cls = normalize_class(cls)
|
|
title = normalize_title(title)
|
|
|
|
mtime = os.path.getmtime(CONFIG_PATH)
|
|
|
|
if mtime != last_mtime:
|
|
print(CONFIG_PATH + ': Updated, reloading config...')
|
|
config = parse_config(CONFIG_PATH)
|
|
last_mtime = mtime
|
|
|
|
if args.verbose:
|
|
print(f'Active window: {cls}|{title}')
|
|
|
|
bindings = lookup_bindings(cls, title)
|
|
subprocess.run([KEYD_BIN, 'bind', 'reset', *bindings], stdout=subprocess.DEVNULL)
|
|
|
|
|
|
mon = get_monitor(on_window_change)
|
|
mon.init()
|
|
|
|
if args.daemonize:
|
|
daemonize()
|
|
|
|
mon.run()
|