diff --git a/README.md b/README.md index 6deec25..b874d6e 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ By filtering such anomalies, we can hopefully remove chatter without impeding ac ## Installation -Download the repository as a zip and extract the file. The dependencies are listed in the requirements.txt. And you can install it with the command below. +Download the repository as a zip and extract the file. The dependencies are listed in the requirements.txt. And you can install it with the command below. ```shell sudo pip3 install -r requirements.txt @@ -71,6 +71,12 @@ sudo python3 -m src - -v {0,1,2}, --verbosity {0,1,2} +- -c CONFIG_PATH, --config-path CONFIG_PATH + - Path to the configuration file specifying the threshold for each key (absolute or relative to the main project directory). The default path is `config.yaml` in the main project directory. If the file does not exist, the default threshold or the provided THRESHOLD will be used for all keys. + +- -n, --new-config + - Create a new configuration file at CONFIG_PATH with commented-out entries for all available keys for a given keyboard. To customize the threshold for a key, uncomment the line and set the threshold value in milliseconds. + ## Automation Starting the script manually every time doesn't sound like the greatest idea, so @@ -90,12 +96,17 @@ Then, copy the `chattering_fix.service` to `/etc/systemd/system/` and enable it ```shell systemctl enable --now chattering_fix ``` -You can check if the systemd unit file is properly working using +You can check if the systemd unit file is properly working using ```shell systemctl status chattering_fix.service ``` -You can also use +You can also use ```shell journalctl -xeu chattering_fix.service ``` -just to make sure that there are no errors. \ No newline at end of file +just to make sure that there are no errors. + +To change individual keys thresholds while the service is running, edit the config file as you normally would and then restart the service with the command below. +```shell +systemctl restart chattering_fix.service +``` diff --git a/src/__main__.py b/src/__main__.py index 96fc06d..775362d 100755 --- a/src/__main__.py +++ b/src/__main__.py @@ -6,14 +6,20 @@ import libevdev from src.filtering import filter_chattering -from src.keyboard_retrieval import retrieve_keyboard_name, INPUT_DEVICES_PATH, abs_keyboard_path +from src.keyboard_retrieval import ( + INPUT_DEVICES_PATH, + abs_keyboard_path, + create_config_file, + parse_config_file, + retrieve_keyboard_name, +) @contextmanager def get_device_handle(keyboard_name: str) -> libevdev.Device: - """ Safely get an evdev device handle. """ + """Safely get an evdev device handle.""" - fd = open(abs_keyboard_path(keyboard_name), 'rb') + fd = open(abs_keyboard_path(keyboard_name), "rb") evdev = libevdev.Device(fd) try: yield evdev @@ -26,9 +32,12 @@ def get_device_handle(keyboard_name: str) -> libevdev.Device: parser.add_argument('-k', '--keyboard', type=str, default=str(), help=f"Name of your chattering keyboard device as listed in {INPUT_DEVICES_PATH}. " f"If left unset, will be attempted to be retrieved automatically.") - parser.add_argument('-t', '--threshold', type=int, default=30, help="Filter time threshold in milliseconds. " + parser.add_argument('-t', '--threshold', type=int, default=None, help="Filter time threshold in milliseconds. " "Default=30ms.") parser.add_argument('-v', '--verbosity', type=int, default=1, choices=[0, 1, 2]) + parser.add_argument('-c', '--config', type=str, default='config.yaml', help="Path to the configuration file.") + parser.add_argument('-n', '--new-config', action='store_true', help="Create a new configuration file at the path " + "defined by --config.") args = parser.parse_args() logging.basicConfig( @@ -43,8 +52,30 @@ def get_device_handle(keyboard_name: str) -> libevdev.Device: ) ], format="%(asctime)s - %(message)s", - datefmt="%H:%M:%S" + datefmt="%H:%M:%S", ) + # Load the config file if it exists. + try: + with open(args.config, "r") as file: + config = parse_config_file(file) + logging.info(f"Using configuration from {args.config}.") + logging.debug(f"Configuration: {config}") + except FileNotFoundError: + config = {} + + # Use common threshold from the config if it was specified. + # Otherwise, use the default value of 30 ms. + if "default" not in config: + config["default"] = 30 + + # Allow overriding the threshold from the command line. + if args.threshold is not None: + config["default"] = args.threshold + logging.info(f"Overriding default threshold with {args.threshold} ms from command line.") + with get_device_handle(args.keyboard or retrieve_keyboard_name()) as device: - filter_chattering(device, args.threshold) + if args.new_config: + create_config_file(device, args.config, default_threshold=config["default"]) + logging.info(f"New configuration file created at {args.config}.") + filter_chattering(device, config) diff --git a/src/filtering.py b/src/filtering.py index 8fc185d..98206e2 100644 --- a/src/filtering.py +++ b/src/filtering.py @@ -5,7 +5,7 @@ import libevdev -def filter_chattering(evdev: libevdev.Device, threshold: int) -> NoReturn: +def filter_chattering(evdev: libevdev.Device, thresholds: dict) -> NoReturn: # grab the device - now only we see the events it emits evdev.grab() # create a copy of the device that we can write to - this will emit the filtered events to anyone who listens @@ -16,43 +16,50 @@ def filter_chattering(evdev: libevdev.Device, threshold: int) -> NoReturn: while True: # since the descriptor is blocking, this blocks until there are events available for e in evdev.events(): - if _from_keystroke(e, threshold): + if _from_keystroke(e, thresholds): ui_dev.send_events([e, libevdev.InputEvent(libevdev.EV_SYN.SYN_REPORT, 0)]) -def _from_keystroke(event: libevdev.InputEvent, threshold: int) -> bool: +def _from_keystroke(event: libevdev.InputEvent, thresholds: dict) -> bool: # no need to relay those - we are going to emit our own if event.matches(libevdev.EV_SYN) or event.matches(libevdev.EV_MSC): return False + # not sure when this would happen, but let's not crash + if event.code is None or event.value is None: + return False + # some events we don't want to filter, like EV_LED for toggling NumLock and the like, and also key hold events if not event.matches(libevdev.EV_KEY) or event.value > 1: - logging.debug(f'FORWARDING {event.code}') + logging.debug(f"FORWARDING {event.code}") return True # the values are 0 for up, 1 for down and 2 for hold if event.value == 0: if _key_pressed[event.code]: - logging.debug(f'FORWARDING {event.code} up') - _last_key_up[event.code] = event.sec * 1E6 + event.usec + logging.debug(f"FORWARDING {event.code} up") + _last_key_up[event.code] = event.sec * 1e6 + event.usec _key_pressed[event.code] = False return True else: - logging.info(f'FILTERING {event.code} up: key not pressed beforehand') + logging.info(f"FILTERING {event.code} up: key not pressed beforehand") return False prev = _last_key_up.get(event.code) - now = event.sec * 1E6 + event.usec + now = event.sec * 1e6 + event.usec - if prev is None or now - prev > threshold * 1E3: - logging.debug(f'FORWARDING {event.code} down') + if prev is None or now - prev > thresholds.get(event.code, thresholds["default"]) * 1e3: + logging.debug(f"FORWARDING {event.code} down") + if prev is not None and now - prev < thresholds.get(event.code, thresholds["default"]) * 1e3 * 2: + logging.debug(f"POTENTIAL chatter on {event.code} down: " + f"last key up event happened {(now - prev) / 1e3} ms ago. " + f"FORWARDING press") _key_pressed[event.code] = True return True - logging.info( - f'FILTERED {event.code} down: last key up event happened {(now - prev) / 1E3} ms ago') + logging.info(f"FILTERED {event.code} down: last key up event happened {(now - prev) / 1e3} ms ago") return False -_last_key_up: Dict[libevdev.EventCode, int] = {} -_key_pressed: DefaultDict[libevdev.EventCode, bool] = defaultdict(bool) \ No newline at end of file +_last_key_up: Dict[libevdev.EventCode, float] = {} +_key_pressed: DefaultDict[libevdev.EventCode, bool] = defaultdict(bool) diff --git a/src/keyboard_retrieval.py b/src/keyboard_retrieval.py index 057a750..e569fb1 100644 --- a/src/keyboard_retrieval.py +++ b/src/keyboard_retrieval.py @@ -1,9 +1,15 @@ import logging import os +import shutil +from io import TextIOWrapper from typing import Final -INPUT_DEVICES_PATH: Final = '/dev/input/by-id' -_KEYBOARD_NAME_SUFFIX: Final = '-kbd' +import libevdev +import yaml + +INPUT_DEVICES_PATH: Final = "/dev/input/by-id" +_KEYBOARD_NAME_SUFFIX: Final = "-kbd" + def retrieve_keyboard_name() -> str: keyboard_devices = list(filter(lambda d: d.endswith(_KEYBOARD_NAME_SUFFIX), os.listdir(INPUT_DEVICES_PATH))) @@ -14,12 +20,12 @@ def retrieve_keyboard_name() -> str: if n_devices == 1: logging.info(f"Found keyboard: {keyboard_devices[0]}") return keyboard_devices[0] - + # Use native Python input for user selection print("Select a device:") for idx, device in enumerate(keyboard_devices, start=1): print(f"{idx}. {device}") - + selected_idx = -1 while selected_idx < 1 or selected_idx > n_devices: try: @@ -28,9 +34,48 @@ def retrieve_keyboard_name() -> str: print(f"Please select a number between 1 and {n_devices}") except ValueError: print("Please enter a valid number") - + return keyboard_devices[selected_idx - 1] + def abs_keyboard_path(device: str) -> str: return os.path.join(INPUT_DEVICES_PATH, device) + +def create_config_file(device: libevdev.Device, config_path: str, default_threshold: int) -> None: + # Create a new configuration file with the default threshold. + with open(config_path, "w") as file: + file.write(f"default: {default_threshold}\n") + + # Write each supported key event with the default threshold, but commented out. + supported = device.evbits + for event_type, event_codes in supported.items(): + if event_type == libevdev.EV_KEY: + for event_code in event_codes: + file.write(f'# "{event_code.name}:{event_code.value}": {default_threshold}\n') + + # Get the current user who invoked sudo (e.g., $SUDO_USER). + user = os.environ.get("SUDO_USER") + if user is None: + raise Exception("Script must be run with sudo.") + + # Change ownership to the invoking user. + shutil.chown(config_path, user=user) + + # Set permissions (read/write for the user, read-only for others). + os.chmod(config_path, 0o644) # rw-r--r-- + + +def parse_config_file(config_file: TextIOWrapper) -> dict: + input_config = yaml.safe_load(config_file) + + # Convert the keys to EventCodes. + config = {} + for key, value in input_config.items(): + if key == "default": + config[key] = value + else: + event_code = libevdev.evbit("EV_KEY", key.split(":")[0]) + config[event_code] = value + + return config