"""
Git command wrapper.
Provides a high-level interface to git commands for worktree management.
"""
import os
import re
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional
class GitCommandError(Exception):
"""Exception raised when a git command fails."""
def __init__(self, message: str, return_code: int = -1, command: str = ""):
self.return_code = return_code
self.command = command
super().__init__(message)
class GitRepo:
"""
High-level wrapper for git commands related to worktree management.
Provides methods for common git operations with proper error handling
and structured output parsing.
"""
def __init__(self, path: Optional[str] = None):
"""
Initialize GitRepo.
Args:
path: Path to the git repository. If None, auto-detects from cwd.
Raises:
GitCommandError: If the path is not a valid git repository.
"""
if path:
self._path = Path(path).resolve()
else:
self._path = Path.cwd()
if not self.is_git_repo():
raise GitCommandError(
f"Not a git repository: {self._path}",
command="git rev-parse --git-dir",
)
@property
def path(self) -> Path:
"""Return the repository path."""
return self._path
def run(self, *args: str, check: bool = True, cwd: Optional[str] = None) -> str:
"""
Run a git command and return its stdout.
Args:
*args: Git command arguments (e.g., "status", "--porcelain").
check: If True, raise GitCommandError on non-zero exit code.
cwd: Working directory for the command. Defaults to repo path.
Returns:
Stdout output as a stripped string.
Raises:
GitCommandError: If the command fails and check is True.
"""
cmd = ["git"] + list(args)
work_dir = cwd or str(self._path)
try:
result = subprocess.run(
cmd,
cwd=work_dir,
capture_output=True,
text=True,
timeout=60,
)
except FileNotFoundError:
raise GitCommandError(
"git command not found. Please ensure git is installed.",
command="git",
)
except subprocess.TimeoutExpired:
raise GitCommandError(
f"Git command timed out: {' '.join(cmd)}",
command=" ".join(cmd),
)
if check and result.returncode != 0:
raise GitCommandError(
f"git {' '.join(args)} failed: {result.stderr.strip()}",
return_code=result.returncode,
command=" ".join(cmd),
)
return result.stdout.strip()
def is_git_repo(self) -> bool:
"""
Check if the current path is inside a git repository.
Returns:
True if inside a git repository, False otherwise.
"""
try:
result = self.run("rev-parse", "--git-dir", check=False)
return bool(result)
except (GitCommandError, Exception):
return False
def get_root(self) -> Path:
"""
Get the root directory of the git repository.
Returns:
Path to the repository root.
"""
output = self.run("rev-parse", "--show-toplevel")
return Path(output)
def get_current_branch(self) -> str:
"""
Get the name of the current branch.
Returns:
Current branch name.
Raises:
GitCommandError: If in a detached HEAD state.
"""
try:
return self.run("rev-parse", "--abbrev-ref", "HEAD")
except GitCommandError:
# Might be in detached HEAD state
return self.run("rev-parse", "HEAD")[:7]
def get_default_branch(self) -> str:
"""
Get the default branch name (main or master).
Checks for the existence of 'main' first, then 'master',
then falls back to whatever the current branch is.
Returns:
Default branch name.
"""
# Try to get from git config
try:
init_branch = self.run("config", "init.defaultBranch", check=False)
if init_branch:
return init_branch
except GitCommandError:
pass
# Check which branches exist
try:
branches = self.run("branch", "--list").strip().split("\n")
branches = [b.strip().lstrip("* ") for b in branches if b.strip()]
if "main" in branches:
return "main"
if "master" in branches:
return "master"
# Fall back to the symbolic ref for HEAD
try:
ref = self.run("symbolic-ref", "refs/remotes/origin/HEAD", check=False)
if ref:
# Extract branch name from refs/remotes/origin/
return ref.split("/")[-1]
except GitCommandError:
pass
# Last resort: use current branch
return self.get_current_branch()
except GitCommandError:
return "main"
def list_worktrees(self) -> List[Dict[str, Any]]:
"""
List all git worktrees.
Parses the output of 'git worktree list --porcelain' into
structured dictionaries.
Returns:
List of worktree information dictionaries with keys:
- path: str - Worktree path
- branch: str - Branch name (or "detached")
- commit: str - Full commit hash
- is_main: bool - Whether this is the main worktree
- is_locked: bool - Whether this worktree is locked
- is_prunable: bool - Whether this worktree is prunable
"""
output = self.run("worktree", "list", "--porcelain")
if not output:
return []
worktrees: List[Dict[str, Any]] = []
current_wt: Dict[str, Any] = {}
for line in output.split("\n"):
line = line.strip()
if not line:
if current_wt:
worktrees.append(current_wt)
current_wt = {}
continue
if line.startswith("worktree "):
current_wt["path"] = line[len("worktree "):]
elif line.startswith("HEAD "):
current_wt["commit"] = line[len("HEAD "):]
elif line.startswith("branch "):
# branch refs/heads/feature-x -> feature-x
branch_ref = line[len("branch "):]
if branch_ref.startswith("refs/heads/"):
current_wt["branch"] = branch_ref[len("refs/heads/"):]
else:
current_wt["branch"] = branch_ref
elif line.startswith("detached"):
current_wt["branch"] = "detached"
elif line.startswith("locked"):
current_wt["is_locked"] = True
elif line.startswith("prunable"):
current_wt["is_prunable"] = True
# Don't forget the last worktree
if current_wt:
worktrees.append(current_wt)
# Mark the main worktree and set defaults
if worktrees:
worktrees[0]["is_main"] = True
for wt in worktrees:
wt.setdefault("is_main", False)
wt.setdefault("is_locked", False)
wt.setdefault("is_prunable", False)
return worktrees
def add_worktree(
self,
path: str,
branch: Optional[str] = None,
create_branch: bool = False,
track: Optional[str] = None,
checkout: bool = True,
) -> None:
"""
Create a new git worktree.
Args:
path: Path for the new worktree directory.
branch: Branch to check out. If None and create_branch is False,
uses the current branch.
create_branch: If True, create a new branch.
track: Remote branch to track (e.g., "origin/feature-x").
checkout: If True, check out the branch in the new worktree.
Raises:
GitCommandError: If the worktree cannot be created.
"""
args: List[str] = ["worktree", "add"]
if not checkout:
args.append("--no-checkout")
if create_branch and branch:
args.extend(["-b", branch])
if track:
args.extend(["--track", track])
elif branch:
args.append(branch)
args.append(path)
self.run(*args)
def remove_worktree(self, path: str, force: bool = False) -> None:
"""
Remove a git worktree.
Args:
path: Path to the worktree directory to remove.
force: If True, force removal even if there are uncommitted changes.
Raises:
GitCommandError: If the worktree cannot be removed.
"""
args = ["worktree", "remove"]
if force:
args.append("--force")
args.append(path)
self.run(*args)
def prune_worktrees(self) -> str:
"""
Prune stale worktree entries.
Removes administrative files for worktrees that have been
deleted manually.
Returns:
Output from the prune command.
"""
return self.run("worktree", "prune")
def get_branch_status(self, branch: str) -> Dict[str, Any]:
"""
Get the status of a branch relative to its tracking branch.
Args:
branch: Branch name to check.
Returns:
Dictionary with keys:
- ahead: int - Number of commits ahead of tracking branch
- behind: int - Number of commits behind tracking branch
- dirty: bool - Whether there are uncommitted changes
- status: str - Overall status string
"""
result: Dict[str, Any] = {
"ahead": 0,
"behind": 0,
"dirty": False,
"status": "clean",
}
# Get ahead/behind counts
try:
output = self.run(
"rev-list", "--left-right", "--count",
f"{branch}...@{{u}}", # @{u} is the upstream
check=False,
)
if output:
parts = output.split("\t")
if len(parts) == 2:
result["ahead"] = int(parts[0])
result["behind"] = int(parts[1])
except (GitCommandError, ValueError):
pass
# Check if working tree is dirty
try:
# Find the worktree path for this branch
worktrees = self.list_worktrees()
wt_path = None
for wt in worktrees:
if wt.get("branch") == branch:
wt_path = wt.get("path")
break
if wt_path:
status_output = self.run("status", "--porcelain", cwd=wt_path)
if status_output:
result["dirty"] = True
except GitCommandError:
pass
# Determine overall status
if result["dirty"]:
result["status"] = "dirty"
elif result["ahead"] > 0 and result["behind"] > 0:
result["status"] = "diverged"
elif result["ahead"] > 0:
result["status"] = "ahead"
elif result["behind"] > 0:
result["status"] = "behind"
else:
result["status"] = "clean"
return result
def merge_branch(
self,
source: str,
target: str,
no_ff: bool = False,
) -> bool:
"""
Merge a source branch into a target branch.
Args:
source: Source branch to merge from.
target: Target branch to merge into.
no_ff: If True, create a merge commit even for fast-forward.
Returns:
True if the merge was successful.
Raises:
GitCommandError: If the merge fails.
"""
# First, checkout the target branch in the main worktree
self.run("checkout", target)
# Perform the merge
args = ["merge", source]
if no_ff:
args.append("--no-ff")
self.run(*args)
return True
def is_branch_merged(self, branch: str, target: Optional[str] = None) -> bool:
"""
Check if a branch has been merged into a target branch.
Args:
branch: Branch name to check.
target: Target branch. Defaults to the default branch.
Returns:
True if the branch is merged into the target.
"""
if target is None:
target = self.get_default_branch()
try:
output = self.run(
"branch", "--merged", target, check=False,
)
merged_branches = [
b.strip().lstrip("* ") for b in output.split("\n") if b.strip()
]
return branch in merged_branches
except GitCommandError:
return False
def delete_branch(self, branch: str, force: bool = False) -> None:
"""
Delete a branch.
Args:
branch: Branch name to delete.
force: If True, force delete even if not fully merged.
Raises:
GitCommandError: If the branch cannot be deleted.
"""
args = ["branch", "--delete"]
if force:
args.append("--force")
args.append(branch)
self.run(*args)
def get_commit_info(self, ref: str = "HEAD", worktree_path: Optional[str] = None) -> Dict[str, str]:
"""
Get detailed commit information.
Args:
ref: Git reference (commit hash, branch, HEAD).
worktree_path: Working directory for the command.
Returns:
Dictionary with keys: hash, short_hash, author, date, message.
"""
cwd = worktree_path or str(self._path)
fmt = "%H%n%h%n%an%n%aI%n%s"
output = self.run("log", "-1", f"--format={fmt}", ref, cwd=cwd)
lines = output.split("\n")
return {
"hash": lines[0] if len(lines) > 0 else "",
"short_hash": lines[1] if len(lines) > 1 else "",
"author": lines[2] if len(lines) > 2 else "",
"date": lines[3] if len(lines) > 3 else "",
"message": lines[4] if len(lines) > 4 else "",
}
def get_uncommitted_changes(self, worktree_path: Optional[str] = None) -> Dict[str, int]:
"""
Get summary of uncommitted changes in a worktree.
Args:
worktree_path: Path to the worktree. Defaults to repo root.
Returns:
Dictionary with keys: modified, added, deleted, untracked.
"""
cwd = worktree_path or str(self._path)
result: Dict[str, int] = {
"modified": 0,
"added": 0,
"deleted": 0,
"untracked": 0,
}
try:
output = self.run("status", "--porcelain", cwd=cwd)
if not output:
return result
for line in output.split("\n"):
if not line:
continue
# porcelain format: XY filename
x = line[0] if len(line) > 0 else " "
y = line[1] if len(line) > 1 else " "
if x == "?" or y == "?":
result["untracked"] += 1
elif x in ("M", "R") or y == "M":
result["modified"] += 1
elif x == "A" or y == "A":
result["added"] += 1
elif x == "D" or y == "D":
result["deleted"] += 1
except GitCommandError:
pass
return result
def get_branches(self) -> List[str]:
"""
Get a list of all local branch names.
Returns:
List of branch name strings.
"""
output = self.run("branch", "--list")
branches = []
for line in output.split("\n"):
line = line.strip()
if line:
# Remove leading "* " for current branch
branches.append(line.lstrip("* "))
return branches
def get_remote_branches(self) -> List[str]:
"""
Get a list of remote branch names.
Returns:
List of remote branch name strings (e.g., "origin/feature-x").
"""
try:
output = self.run("branch", "-r", check=False)
branches = []
for line in output.split("\n"):
line = line.strip()
if line and "HEAD" not in line:
branches.append(line)
return branches
except GitCommandError:
return []
def has_remote_branch(self, branch: str) -> bool:
"""
Check if a remote branch exists for the given branch name.
Args:
branch: Local branch name.
Returns:
True if a remote tracking branch exists.
"""
remote_branches = self.get_remote_branches()
for rb in remote_branches:
# Check for origin/branch or upstream/branch
parts = rb.split("/", 1)
if len(parts) == 2 and parts[1] == branch:
return True
return False
def get_last_modified(self, worktree_path: str) -> str:
"""
Get the last modification time for a worktree.
Uses the HEAD commit date of the worktree.
Args:
worktree_path: Path to the worktree.
Returns:
Formatted date/time string.
"""
try:
info = self.get_commit_info("HEAD", worktree_path=worktree_path)
return info.get("date", "")
except GitCommandError:
return ""