#!/usr/bin/env python3 import os import re import argparse import sys IFS = (" ", "\t", "\n") class EOFException(Exception): pass class Argument(): def __init__(self, value: str, allow_expansion=True): pass def expand(): pass class Pipeline(): def __init__(self, instructions: list=[]): self.instructions = instructions def execute(self): for instruction in self.instructions: instruction.execute() class Instruction(): def __init__(self, tokens: list=[], stdout=None, stderr=None): self.prog = tokens[0] self.args = tokens[1:] self.stdout = stdout self.stderr = stderr def execute(self): cpid = os.fork() if cpid == 0: if self.stdout is not None: fd = os.open(self.stdout, os.O_WRONLY) os.dup2(sys.stdout.fileno(), fd) if self.stderr is not None: fd = os.open(self.stderr, os.O_WRONLY) os.dup2(sys.stderr.fileno(), fd) os.execvp(self.prog, [self.prog] + self.args) else: _, status = os.waitpid(cpid, 0) return status class AndInstruction(Instruction): def __init__(self, instruction1: Instruction, instruction2: Instruction): self.instruction1 = instruction1 self.instruction2 = instruction2 def execute(self): status = self.instruction1.execute() if os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0: return self.instruction2.execute() class OrInstruction(Instruction): def __init__(self, instruction1: Instruction, instruction2: Instruction): self.instruction1 = instruction1 self.instruction2 = instruction2 def execute(self): status = self.instruction1.execute() if not os.WIFEXITED(status) or not os.WEXITSTATUS(status) == 0: return self.instruction2.execute() class PipelineParser(): def read_escaped(self, string_iterator): return next(string_iterator) def read_literal(self, string_iterator): literal = "" while True: c = next(string_iterator) if c != "'": literal += c else: return literal def read_quoted(self, string_iterator): # TODO handle substitutions quoted = "" while True: c = next(string_iterator) if c == "\\": quoted += self.read_escaped(string_iterator) elif c != '"': quoted += c else: return quoted def get_next_token(self, string_iterator): token = None while True: try: c = next(string_iterator) except StopIteration: break # Skip leading whitespaces if token is None and c in IFS: continue elif token is None: token = "" if c == "'": token += self.read_literal(string_iterator) elif c == "\\": token += self.read_escaped(string_iterator) elif c == "\"": token += self.read_quoted(string_iterator) elif c in IFS: return token else: token += c return token def tokenize(self, string_iterator): while True: token = self.get_next_token(string_iterator) if token is None: break yield token def parse(self, line): tokens = list(self.tokenize(iter(line))) return Pipeline([Instruction(tokens)]) def read_next_pipeline(fh): # TODO Support multiple pipelines per line # TODO Support instructions spawning multiple lines parser = PipelineParser() while True: line = fh.readline() if line == "": raise EOFException() line = line.strip() # We ignore empty lines and lines which start with # if not line.startswith("#") and not line == "": break return parser.parse(line) def main(arguments): for script in arguments["