import argparse import subprocess import logging import pprint SO_SOCKET = 1 SO_TCP = 6 SO_KEEPALIVE = 9 TCP_KEEPIDLE = 4 TCP_KEEPINTVL = 5 TCP_KEEPCNT = 6 def get_commands_enable_keepalive(sockfd: int, **kwargs): return ( "set $rsp -= sizeof(int)", "set {int}$rsp = 1", f"call (int)setsockopt({sockfd}, {SO_SOCKET}, {SO_KEEPALIVE}, $rsp, sizeof(int))", "set $rsp += sizeof(int)", ) def get_commands_disable_keepalive(sockfd: int, **kwargs): return ( "set $rsp -= sizeof(int)", "set {int}$rsp = 0", f"call (int)setsockopt({sockfd}, {SO_SOCKET}, {SO_KEEPALIVE}, $rsp, sizeof(int))", "set $rsp += sizeof(int)", ) def get_commands_change_keepalive_time(sockfd: int, value: int): return ( "set $rsp -= sizeof(int)", f"set {{int}}$rsp = {value}", f"call (int)setsockopt({sockfd}, {SO_TCP}, {TCP_KEEPIDLE}, $rsp, sizeof(int))", "set $rsp += sizeof(int)", ) def get_commands_change_keepalive_intvl(sockfd: int, value: int): return ( "set $rsp -= sizeof(int)", f"set {{int}}$rsp = {value}", f"call (int)setsockopt({sockfd}, {SO_TCP}, {TCP_KEEPINTVL}, $rsp, sizeof(int))", "set $rsp += sizeof(int)", ) def get_commands_change_keepalive_count(sockfd: int, value: int): return ( "set $rsp -= sizeof(int)", f"set {{int}}$rsp = {value}", f"call (int)setsockopt({sockfd}, {SO_TCP}, {TCP_KEEPCNT}, $rsp, sizeof(int))", "set $rsp += sizeof(int)", ) def main(args): numeric_level = getattr(logging, args["loglevel"].upper(), None) if not isinstance(numeric_level, int): raise ValueError(f"Invalid log level: {args['loglevel']}") logging.basicConfig(level=numeric_level) cmd = ["gdb", "--pid", str(args["pid"]), "--batch-silent"] gdb_cmds = list() gdb_cmds.extend(args["command"](**args)) logging.debug("gdb commands: %s", pprint.pformat(gdb_cmds)) cmd.extend(map(lambda gdb_cmd: f"--eval-command={gdb_cmd}", gdb_cmds)) logging.debug("Executing: %s", pprint.pformat(cmd)) subprocess.run(cmd, check=True) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--pid", type=int, required=True, help="PID of the running process to alter") parser.add_argument("--sockfd", type=int, required=True, help="Socket file-descriptor number to alter") parser.add_argument("--loglevel", default="info", help="Change log level") subparsers = parser.add_subparsers(help="Command") # Enable TCP keepalive subparser = subparsers.add_parser("enable-keepalive", aliases=["enable-ka"], help="Enable TCP keepalive on the specified socket") subparser.set_defaults(command=get_commands_enable_keepalive) # Disable TCP keepalive subparser = subparsers.add_parser("disable-keepalive", aliases=["disable-ka"], help="Disable TCP keepalive on the specified socket") subparser.set_defaults(command=get_commands_disable_keepalive) # Set TCP keepalive time subparser = subparsers.add_parser("set-keepalive-time", aliases=["set-ka-time"], help="Set TCP keepalive time (i.e. net.ipv4.tcp_keepalive_time)") subparser.add_argument("value", type=int) subparser.set_defaults(command=get_commands_change_keepalive_time) # Set TCP keepalive interval subparser = subparsers.add_parser("set-keepalive-interval", aliases=["set-ka-intvl", "set-ka-interval"], help="Set TCP keepalive interval (i.e. net.ipv4.tcp_keepalive_intvl)") subparser.add_argument("value", type=int) subparser.set_defaults(command=get_commands_change_keepalive_intvl) # Set TCP keepalive count subparser = subparsers.add_parser("set-keepalive-count", aliases=["set-ka-count"], help="Set TCP keepalive count (i.e. net.ipv4.tcp_keepalive_probes)") subparser.add_argument("value", type=int) subparser.set_defaults(command=get_commands_change_keepalive_count) main(vars(parser.parse_args()))