#!/usr/bin/env python3 """ Alter the socket options of a running process """ import argparse import subprocess import logging import pprint SO_SOCKET = 1 SO_TCP = 6 SO_KEEPALIVE = 9 TCP_KEEPIDLE = 4 TCP_KEEPINTVL = 5 TCP_KEEPCNT = 6 MSG_NOSIGNAL = 0x4000 def _get_commands_enable_disable_keepalive(sockfd: int, enable: bool, **kwargs): """ Return GDB instructions to enable or disable keepalive on a given socket """ return ( "set $rsp -= sizeof(int)", f"set {{int}}$rsp = {1 if enable else 0}", f"call (int)setsockopt({sockfd}, {SO_SOCKET}, {SO_KEEPALIVE}, $rsp, sizeof(int))", "set $rsp += sizeof(int)", ) def _get_commands_change_tcp_option(sockfd: int, value: int, param: int, **kwargs): """ Return GDB instructions to change a given TCP option on a given socket """ return ( "set $rsp -= sizeof(int)", f"set {{int}}$rsp = {value}", f"call (int)setsockopt({sockfd}, {SO_TCP}, {param}, $rsp, sizeof(int))", "set $rsp += sizeof(int)", ) def get_commands_send_data(sockfd: int, data: str, **kwargs): """ Return GDB instructions to send raw data through a socket """ if '"' in data or "\n" in data: raise ValueError("Double-quotes and newlines are not supported") return [f'call (int)send({sockfd}, "{data}", {len(data)}, {hex(MSG_NOSIGNAL)})'] def get_commands_enable_keepalive(**kwargs): """ Return GDB commands to enable keepalive on a given socket """ return _get_commands_enable_disable_keepalive(enable=True, **kwargs) def get_commands_disable_keepalive(**kwargs): """ Return GDB commands to disable keepalive on a given socket """ return _get_commands_enable_disable_keepalive(enable=False, **kwargs) def get_commands_change_keepalive_time(**kwargs): """ Return GDB commands to set the TCP keepalive time on a given socket """ return _get_commands_change_tcp_option(param=TCP_KEEPIDLE, **kwargs) def get_commands_change_keepalive_intvl(**kwargs): """ Return GDB commands to set the TCP keepalive interval on a given socket """ return _get_commands_change_tcp_option(param=TCP_KEEPINTVL, **kwargs) def get_commands_change_keepalive_count(**kwargs): """ Return GDB commands to set the TCP keepalive count on a given socket """ return _get_commands_change_tcp_option(param=TCP_KEEPCNT, **kwargs) def main(args): numeric_level = getattr(logging, args["loglevel"].upper(), None) if not isinstance(numeric_level, int): raise ValueError(f"Invalid log level: {args['loglevel']}") logging.basicConfig(level=numeric_level) cmd = ["gdb", "--pid", str(args["pid"]), "--batch-silent"] gdb_cmds = list() gdb_cmds.extend(args["command"](**args)) logging.debug("gdb commands: %s", pprint.pformat(gdb_cmds)) cmd.extend(map(lambda gdb_cmd: f"--eval-command={gdb_cmd}", gdb_cmds)) logging.debug("Executing: %s", pprint.pformat(cmd)) subprocess.run(cmd, check=True) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--pid", type=int, required=True, help="PID of the running process to alter") parser.add_argument("--sockfd", type=int, required=True, help="Socket file-descriptor number to alter") parser.add_argument("--loglevel", default="info", help="Change log level") subparsers = parser.add_subparsers(help="Command") # Enable TCP keepalive subparser = subparsers.add_parser("enable-keepalive", aliases=["enable-ka"], help="Enable TCP keepalive on the specified socket") subparser.set_defaults(command=get_commands_enable_keepalive) # Disable TCP keepalive subparser = subparsers.add_parser("disable-keepalive", aliases=["disable-ka"], help="Disable TCP keepalive on the specified socket") subparser.set_defaults(command=get_commands_disable_keepalive) # Set TCP keepalive time subparser = subparsers.add_parser("set-keepalive-time", aliases=["set-ka-time"], help="Set TCP keepalive time (i.e. net.ipv4.tcp_keepalive_time)") subparser.add_argument("value", type=int) subparser.set_defaults(command=get_commands_change_keepalive_time) # Set TCP keepalive interval subparser = subparsers.add_parser("set-keepalive-interval", aliases=["set-ka-intvl", "set-ka-interval"], help="Set TCP keepalive interval (i.e. net.ipv4.tcp_keepalive_intvl)") subparser.add_argument("value", type=int) subparser.set_defaults(command=get_commands_change_keepalive_intvl) # Set TCP keepalive count subparser = subparsers.add_parser("set-keepalive-count", aliases=["set-ka-count"], help="Set TCP keepalive count (i.e. net.ipv4.tcp_keepalive_probes)") subparser.add_argument("value", type=int) subparser.set_defaults(command=get_commands_change_keepalive_count) # Send data subparser = subparsers.add_parser("send-data", aliases=["send"], help="Send data through the socket") subparser.add_argument("data", help="Raw data") subparser.set_defaults(command=get_commands_send_data) main(vars(parser.parse_args()))