#!python3 import subprocess as sp import _thread as thread import argparse import csv import os import sys import time # if these strings are found in output of git/git-pw, # we need to take some actions. prune_warining = "warning: There are too many unreachable loose objects; run 'git prune' to remove them." resource_not_found_warning = "Resource not found" already_applied_warning = "No changes -- Patch already applied." # These lists will contain merged and unmnerged series data. merged = [] unmerged = [] unavailable = [] already_applied = [] # parse the csv entries def read_rows(csvfile): # List for series entries series_data = [] csvreader = csv.reader(csvfile, delimiter=",", quotechar='"') for row in csvreader: print(row) if not row: return if row and row[0] != "ID": series_data.append(row) return series_data def run_cmd(cmd, debug=False): """ Execute command and return the exit code and output. """ exit_code = 0 output = "" try: output = sp.check_output( cmd, stderr=sp.STDOUT, shell=True, universal_newlines=True ) except sp.CalledProcessError as exc: if debug: print("Status : FAIL", exc.returncode, exc.output) exit_code, output = exc.returncode, exc.output else: if debug: print("{}\n".format(output)) return exit_code, output def write_file(filename, list_): """ This function is used to write the IDs for patches/series that are merged/unmerged/unavailable after we have processed everything. """ with open(filename, "w") as f: for i in list_: f.write(i[0] + "\n") def write_json(file_name, data): import json with open(file_name, "w") as f: f.write(json.dumps(data)) def apply_(series): """if git throws a warning saying "warning: There are too many unreachable loose objects; run 'git prune' to remove them." `git prune` will be executed. otherwise output will be printed and exit code will be returned. """ for i in series: try: print( f"{bcolors.OKGREEN}trying to apply:{type_}{bcolors.OKCYAN} {i[0]} {bcolors.ENDC}" ) print(f"{bcolors.OKBLUE} {i[1]}, {bcolors.UNDERLINE}{i[2]}{bcolors.ENDC}") if i[0] == "ID": pass exit_code, output = run_cmd(f"git-pw {type_} apply {i[0]}") if prune_warining in output: print("running: git prune") run_cmd("git prune") if exit_code == 1 and resource_not_found_warning in output: print(f"{bcolors.WARNING}patch unavailable{bcolors.ENDC}") unavailable.append(i) if exit_code: # if `git-pw patch/series apply ` fails # resetting to HEAD print( f"{bcolors.OKCYAN}git exit code: {bcolors.FAIL}{exit_code}{bcolors.ENDC}" ) unmerged.append(i) else: if output.strip().endswith(already_applied_warning): print( f"{bcolors.WARNING}No changes -- already applied {bcolors.ENDC}\n" ) already_applied.append(i) else: print(f"{bcolors.OKCYAN}{type_} applied{bcolors.ENDC}\n") merged.append(i) print(f"{bcolors.FAIL}resetting to HEAD: {bcolors.ENDC}") if os.path.exists(".git/rebase-apply"): run_cmd("git am --abort") run_cmd("git reset --hard master", debug=True) except KeyboardInterrupt as ke: break except Exception as e: print(e) break def get_patches(from_page=1, to_page=100): cmd = "git-pw patch list --page {0} -f csv --state 'new'" patches = [] for i in range(from_page, to_page): exit_code, output = run_cmd(cmd.format(i), debug=True) if exit_code: print(f"git-pw exited with exit code {exit_code}") # patches.extend(output.strip().split("\n")) break patches.extend(output.strip().split("\n")) # print(patches) return patches def get_series(from_page=1, to_page=100): cmd = "git-pw series list --page {0} -f csv" series = [] for i in range(from_page, to_page): exit_code, output = run_cmd(cmd.format(i), debug=True) if exit_code: print(f"git-pw exited with exit code {exit_code}") # series.extend(output.strip().split("\n")) break series.extend(output.strip().split("\n")) return series def get_patches_for_series(list_, index, series_dict, patch_ids): for i in list_: print(f"running: git-pw series show {i} -f csv") ret, op = run_cmd(f"git-pw series show {i} -f csv") print("****", index, "****") if ret: print(f"exitted with {ret}: {op}") series_data = read_rows(op.strip().split("\n")) print(series_data[1]) for j in series_data[11:]: patch_data = j[1].split() print(patch_data[0], patch_data[1]) series_dict[i].append(patch_data[0]) if patch_data[0] in patch_ids: patch_ids.remove(patch_data[0]) def get_individual_patches(): """This function iterates over all of the series and all of the patches. All of the patches that do not belong to any series are dumped into a file. """ # TODO: This function needs a refactor. file_loc = "/tmp/pwanalysis" if not os.path.exists(file_loc): os.mkdir(file_loc) series = [i for i in read_rows(get_series())] # print(series) series_dict = {i[0]: [] for i in series} patches = read_rows(get_patches()) patch_ids = [i[0] for i in patches] # print(patch_ids) series_ids = [i for i in series_dict.keys()] get_patches_for_series(series_ids, 0, series_dict, patch_ids) print("Individual patches", len(patch_ids)) print("series_dict", series_dict) write_json(file_loc + "/dict", series_dict) write_json(file_loc + "/patch_ids", {"patches": patch_ids}) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Initial Ci script for patchwork") parser.add_argument( "-c", "--colors", default=False, action="store_true", help="Enable colors" ) parser.add_argument( "-t", "--type", type=str, default="series", choices=["patch", "series"], help="type: patch/series", ) parser.add_argument( "-a", "--action", type=str, default="apply", help="action: list/apply" ) parser.add_argument( "-o", "--output-location", type=str, default="/tmp/pw-results", help="location for the output files containing merged, umerged and unavailable patches/series.", ) parser.add_argument( "-i", "--input-file", type=str, default="", help="input file: csv file or '-' for the standard input. If no file is specified\ this data will be pulled from patchwork instance.", ) parser.add_argument( "-p", "--page-range", default="1-100", help="page range for patchwork in the format 'from_pageNo'-'to_pageNo' for example '1-100'", ) parser.add_argument( "-u", "--get-individual-patches", default=False, action="store_true", help="""Get individual patches. In this case the series data and the patches data is pulled and compared to find out the individual patches that do not belong to any series.""", ) args = parser.parse_args() print(args) csv_data = [] if args.input_file == "-": # Get the csv data from stdin for line in sys.stdin: if not '"ID"' in line: print(line) csv_data.append(line.strip()) elif os.path.exists(args.input_file): data = open(args.input_file).read().strip().split("\n") for line in data: if not '"ID"' in line: # print(line) csv_data.append(line.strip()) # option that we will be operating upon, series or the patch # this is the command line argument to git-pw # for example "git-pw patch apply 12345" or "git-pw series apply "12356" type_ = args.type series_data = read_rows(csv_data) output_files_loc = args.output_location if not os.path.exists(output_files_loc): os.mkdir(output_files_loc) if args.get_individual_patches: print("getting individual patches") get_individual_patches() sys.exit(0) colors = args.colors class bcolors: if colors: HEADER = "\033[95m" OKBLUE = "\033[94m" OKCYAN = "\033[96m" OKGREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" else: HEADER = "" OKBLUE = "" OKCYAN = "" OKGREEN = "" WARNING = "" FAIL = "" ENDC = "" BOLD = "" UNDERLINE = "" print(len(series_data)) if args.action == "apply": if args.input_file == "": try: from_range, to_range = [int(i) for i in args.page_range.split("-")] except ValueError as ve: print("invalid page range") sys.exit(1) series_data = [i for i in read_rows(get_series(from_range, to_range + 1))] apply_(series_data) print( "total merged: {0}, total unmerged {1}, total unavailable{2}".format( len(merged), len(unmerged), len(unavailable) ) ) write_file(f"{output_files_loc}/merged.txt", merged) write_file(f"{output_files_loc}/unmerged.txt", unmerged) write_file(f"{output_files_loc}/unavailable.txt", unavailable) write_file(f"{output_files_loc}/already_applied.txt", already_applied)