""" Git command wrapper. Provides a high-level interface to git commands for worktree management. """ import os import re import subprocess from pathlib import Path from typing import Any, Dict, List, Optional class GitCommandError(Exception): """Exception raised when a git command fails.""" def __init__(self, message: str, return_code: int = -1, command: str = ""): self.return_code = return_code self.command = command super().__init__(message) class GitRepo: """ High-level wrapper for git commands related to worktree management. Provides methods for common git operations with proper error handling and structured output parsing. """ def __init__(self, path: Optional[str] = None): """ Initialize GitRepo. Args: path: Path to the git repository. If None, auto-detects from cwd. Raises: GitCommandError: If the path is not a valid git repository. """ if path: self._path = Path(path).resolve() else: self._path = Path.cwd() if not self.is_git_repo(): raise GitCommandError( f"Not a git repository: {self._path}", command="git rev-parse --git-dir", ) @property def path(self) -> Path: """Return the repository path.""" return self._path def run(self, *args: str, check: bool = True, cwd: Optional[str] = None) -> str: """ Run a git command and return its stdout. Args: *args: Git command arguments (e.g., "status", "--porcelain"). check: If True, raise GitCommandError on non-zero exit code. cwd: Working directory for the command. Defaults to repo path. Returns: Stdout output as a stripped string. Raises: GitCommandError: If the command fails and check is True. """ cmd = ["git"] + list(args) work_dir = cwd or str(self._path) try: result = subprocess.run( cmd, cwd=work_dir, capture_output=True, text=True, timeout=60, ) except FileNotFoundError: raise GitCommandError( "git command not found. Please ensure git is installed.", command="git", ) except subprocess.TimeoutExpired: raise GitCommandError( f"Git command timed out: {' '.join(cmd)}", command=" ".join(cmd), ) if check and result.returncode != 0: raise GitCommandError( f"git {' '.join(args)} failed: {result.stderr.strip()}", return_code=result.returncode, command=" ".join(cmd), ) return result.stdout.strip() def is_git_repo(self) -> bool: """ Check if the current path is inside a git repository. Returns: True if inside a git repository, False otherwise. """ try: result = self.run("rev-parse", "--git-dir", check=False) return bool(result) except (GitCommandError, Exception): return False def get_root(self) -> Path: """ Get the root directory of the git repository. Returns: Path to the repository root. """ output = self.run("rev-parse", "--show-toplevel") return Path(output) def get_current_branch(self) -> str: """ Get the name of the current branch. Returns: Current branch name. Raises: GitCommandError: If in a detached HEAD state. """ try: return self.run("rev-parse", "--abbrev-ref", "HEAD") except GitCommandError: # Might be in detached HEAD state return self.run("rev-parse", "HEAD")[:7] def get_default_branch(self) -> str: """ Get the default branch name (main or master). Checks for the existence of 'main' first, then 'master', then falls back to whatever the current branch is. Returns: Default branch name. """ # Try to get from git config try: init_branch = self.run("config", "init.defaultBranch", check=False) if init_branch: return init_branch except GitCommandError: pass # Check which branches exist try: branches = self.run("branch", "--list").strip().split("\n") branches = [b.strip().lstrip("* ") for b in branches if b.strip()] if "main" in branches: return "main" if "master" in branches: return "master" # Fall back to the symbolic ref for HEAD try: ref = self.run("symbolic-ref", "refs/remotes/origin/HEAD", check=False) if ref: # Extract branch name from refs/remotes/origin/ return ref.split("/")[-1] except GitCommandError: pass # Last resort: use current branch return self.get_current_branch() except GitCommandError: return "main" def list_worktrees(self) -> List[Dict[str, Any]]: """ List all git worktrees. Parses the output of 'git worktree list --porcelain' into structured dictionaries. Returns: List of worktree information dictionaries with keys: - path: str - Worktree path - branch: str - Branch name (or "detached") - commit: str - Full commit hash - is_main: bool - Whether this is the main worktree - is_locked: bool - Whether this worktree is locked - is_prunable: bool - Whether this worktree is prunable """ output = self.run("worktree", "list", "--porcelain") if not output: return [] worktrees: List[Dict[str, Any]] = [] current_wt: Dict[str, Any] = {} for line in output.split("\n"): line = line.strip() if not line: if current_wt: worktrees.append(current_wt) current_wt = {} continue if line.startswith("worktree "): current_wt["path"] = line[len("worktree "):] elif line.startswith("HEAD "): current_wt["commit"] = line[len("HEAD "):] elif line.startswith("branch "): # branch refs/heads/feature-x -> feature-x branch_ref = line[len("branch "):] if branch_ref.startswith("refs/heads/"): current_wt["branch"] = branch_ref[len("refs/heads/"):] else: current_wt["branch"] = branch_ref elif line.startswith("detached"): current_wt["branch"] = "detached" elif line.startswith("locked"): current_wt["is_locked"] = True elif line.startswith("prunable"): current_wt["is_prunable"] = True # Don't forget the last worktree if current_wt: worktrees.append(current_wt) # Mark the main worktree and set defaults if worktrees: worktrees[0]["is_main"] = True for wt in worktrees: wt.setdefault("is_main", False) wt.setdefault("is_locked", False) wt.setdefault("is_prunable", False) return worktrees def add_worktree( self, path: str, branch: Optional[str] = None, create_branch: bool = False, track: Optional[str] = None, checkout: bool = True, ) -> None: """ Create a new git worktree. Args: path: Path for the new worktree directory. branch: Branch to check out. If None and create_branch is False, uses the current branch. create_branch: If True, create a new branch. track: Remote branch to track (e.g., "origin/feature-x"). checkout: If True, check out the branch in the new worktree. Raises: GitCommandError: If the worktree cannot be created. """ args: List[str] = ["worktree", "add"] if not checkout: args.append("--no-checkout") if create_branch and branch: args.extend(["-b", branch]) if track: args.extend(["--track", track]) elif branch: args.append(branch) args.append(path) self.run(*args) def remove_worktree(self, path: str, force: bool = False) -> None: """ Remove a git worktree. Args: path: Path to the worktree directory to remove. force: If True, force removal even if there are uncommitted changes. Raises: GitCommandError: If the worktree cannot be removed. """ args = ["worktree", "remove"] if force: args.append("--force") args.append(path) self.run(*args) def prune_worktrees(self) -> str: """ Prune stale worktree entries. Removes administrative files for worktrees that have been deleted manually. Returns: Output from the prune command. """ return self.run("worktree", "prune") def get_branch_status(self, branch: str) -> Dict[str, Any]: """ Get the status of a branch relative to its tracking branch. Args: branch: Branch name to check. Returns: Dictionary with keys: - ahead: int - Number of commits ahead of tracking branch - behind: int - Number of commits behind tracking branch - dirty: bool - Whether there are uncommitted changes - status: str - Overall status string """ result: Dict[str, Any] = { "ahead": 0, "behind": 0, "dirty": False, "status": "clean", } # Get ahead/behind counts try: output = self.run( "rev-list", "--left-right", "--count", f"{branch}...@{{u}}", # @{u} is the upstream check=False, ) if output: parts = output.split("\t") if len(parts) == 2: result["ahead"] = int(parts[0]) result["behind"] = int(parts[1]) except (GitCommandError, ValueError): pass # Check if working tree is dirty try: # Find the worktree path for this branch worktrees = self.list_worktrees() wt_path = None for wt in worktrees: if wt.get("branch") == branch: wt_path = wt.get("path") break if wt_path: status_output = self.run("status", "--porcelain", cwd=wt_path) if status_output: result["dirty"] = True except GitCommandError: pass # Determine overall status if result["dirty"]: result["status"] = "dirty" elif result["ahead"] > 0 and result["behind"] > 0: result["status"] = "diverged" elif result["ahead"] > 0: result["status"] = "ahead" elif result["behind"] > 0: result["status"] = "behind" else: result["status"] = "clean" return result def merge_branch( self, source: str, target: str, no_ff: bool = False, ) -> bool: """ Merge a source branch into a target branch. Args: source: Source branch to merge from. target: Target branch to merge into. no_ff: If True, create a merge commit even for fast-forward. Returns: True if the merge was successful. Raises: GitCommandError: If the merge fails. """ # First, checkout the target branch in the main worktree self.run("checkout", target) # Perform the merge args = ["merge", source] if no_ff: args.append("--no-ff") self.run(*args) return True def is_branch_merged(self, branch: str, target: Optional[str] = None) -> bool: """ Check if a branch has been merged into a target branch. Args: branch: Branch name to check. target: Target branch. Defaults to the default branch. Returns: True if the branch is merged into the target. """ if target is None: target = self.get_default_branch() try: output = self.run( "branch", "--merged", target, check=False, ) merged_branches = [ b.strip().lstrip("* ") for b in output.split("\n") if b.strip() ] return branch in merged_branches except GitCommandError: return False def delete_branch(self, branch: str, force: bool = False) -> None: """ Delete a branch. Args: branch: Branch name to delete. force: If True, force delete even if not fully merged. Raises: GitCommandError: If the branch cannot be deleted. """ args = ["branch", "--delete"] if force: args.append("--force") args.append(branch) self.run(*args) def get_commit_info(self, ref: str = "HEAD", worktree_path: Optional[str] = None) -> Dict[str, str]: """ Get detailed commit information. Args: ref: Git reference (commit hash, branch, HEAD). worktree_path: Working directory for the command. Returns: Dictionary with keys: hash, short_hash, author, date, message. """ cwd = worktree_path or str(self._path) fmt = "%H%n%h%n%an%n%aI%n%s" output = self.run("log", "-1", f"--format={fmt}", ref, cwd=cwd) lines = output.split("\n") return { "hash": lines[0] if len(lines) > 0 else "", "short_hash": lines[1] if len(lines) > 1 else "", "author": lines[2] if len(lines) > 2 else "", "date": lines[3] if len(lines) > 3 else "", "message": lines[4] if len(lines) > 4 else "", } def get_uncommitted_changes(self, worktree_path: Optional[str] = None) -> Dict[str, int]: """ Get summary of uncommitted changes in a worktree. Args: worktree_path: Path to the worktree. Defaults to repo root. Returns: Dictionary with keys: modified, added, deleted, untracked. """ cwd = worktree_path or str(self._path) result: Dict[str, int] = { "modified": 0, "added": 0, "deleted": 0, "untracked": 0, } try: output = self.run("status", "--porcelain", cwd=cwd) if not output: return result for line in output.split("\n"): if not line: continue # porcelain format: XY filename x = line[0] if len(line) > 0 else " " y = line[1] if len(line) > 1 else " " if x == "?" or y == "?": result["untracked"] += 1 elif x in ("M", "R") or y == "M": result["modified"] += 1 elif x == "A" or y == "A": result["added"] += 1 elif x == "D" or y == "D": result["deleted"] += 1 except GitCommandError: pass return result def get_branches(self) -> List[str]: """ Get a list of all local branch names. Returns: List of branch name strings. """ output = self.run("branch", "--list") branches = [] for line in output.split("\n"): line = line.strip() if line: # Remove leading "* " for current branch branches.append(line.lstrip("* ")) return branches def get_remote_branches(self) -> List[str]: """ Get a list of remote branch names. Returns: List of remote branch name strings (e.g., "origin/feature-x"). """ try: output = self.run("branch", "-r", check=False) branches = [] for line in output.split("\n"): line = line.strip() if line and "HEAD" not in line: branches.append(line) return branches except GitCommandError: return [] def has_remote_branch(self, branch: str) -> bool: """ Check if a remote branch exists for the given branch name. Args: branch: Local branch name. Returns: True if a remote tracking branch exists. """ remote_branches = self.get_remote_branches() for rb in remote_branches: # Check for origin/branch or upstream/branch parts = rb.split("/", 1) if len(parts) == 2 and parts[1] == branch: return True return False def get_last_modified(self, worktree_path: str) -> str: """ Get the last modification time for a worktree. Uses the HEAD commit date of the worktree. Args: worktree_path: Path to the worktree. Returns: Formatted date/time string. """ try: info = self.get_commit_info("HEAD", worktree_path=worktree_path) return info.get("date", "") except GitCommandError: return ""