Skip to content
Permalink
Browse files
Merge pull request #1308 from Yobmod/main
Add remaining types to symbolic.py
  • Loading branch information
Yobmod committed Jul 31, 2021
2 parents 300330d + 2a350b5 commit 05825dcaaf6bf3de7d7e2bc595ad6dcb145f11e8
@@ -39,10 +39,10 @@

# typing ---------------------------------------------------------------------------

from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, List, Mapping,
from typing import (Any, AnyStr, BinaryIO, Callable, Dict, IO, Iterator, List, Mapping,
Sequence, TYPE_CHECKING, TextIO, Tuple, Union, cast, overload)

from git.types import PathLike, Literal, TBD
from git.types import PathLike, Literal

if TYPE_CHECKING:
from git.repo.base import Repo
@@ -146,11 +146,11 @@ def dashify(string: str) -> str:
return string.replace('_', '-')


def slots_to_dict(self, exclude: Sequence[str] = ()) -> Dict[str, Any]:
def slots_to_dict(self: object, exclude: Sequence[str] = ()) -> Dict[str, Any]:
return {s: getattr(self, s) for s in self.__slots__ if s not in exclude}


def dict_to_slots_and__excluded_are_none(self, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None:
def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None:
for k, v in d.items():
setattr(self, k, v)
for k in excluded:
@@ -192,7 +192,7 @@ class Git(LazyMixin):
def __getstate__(self) -> Dict[str, Any]:
return slots_to_dict(self, exclude=self._excluded_)

def __setstate__(self, d) -> None:
def __setstate__(self, d: Dict[str, Any]) -> None:
dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_)

# CONFIGURATION
@@ -434,10 +434,13 @@ def wait(self, stderr: Union[None, bytes] = b'') -> int:
if self.proc is not None:
status = self.proc.wait()

def read_all_from_possibly_closed_stream(stream):
try:
return stderr + force_bytes(stream.read())
except ValueError:
def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes:
if stream:
try:
return stderr + force_bytes(stream.read())
except ValueError:
return stderr or b''
else:
return stderr or b''

if status != 0:
@@ -907,7 +910,7 @@ def _kill_process(pid: int) -> None:
if self.GIT_PYTHON_TRACE == 'full':
cmdstr = " ".join(redacted_command)

def as_text(stdout_value):
def as_text(stdout_value: Union[bytes, str]) -> str:
return not output_stream and safe_decode(stdout_value) or '<OUTPUT_STREAM>'
# end

@@ -932,10 +935,10 @@ def as_text(stdout_value):
else:
return stdout_value

def environment(self):
def environment(self) -> Dict[str, str]:
return self._environment

def update_environment(self, **kwargs):
def update_environment(self, **kwargs: Any) -> Dict[str, Union[str, None]]:
"""
Set environment variables for future git invocations. Return all changed
values in a format that can be passed back into this function to revert
@@ -962,7 +965,7 @@ def update_environment(self, **kwargs):
return old_env

@contextmanager
def custom_environment(self, **kwargs):
def custom_environment(self, **kwargs: Any) -> Iterator[None]:
"""
A context manager around the above ``update_environment`` method to restore the
environment back to its previous state after operation.
@@ -1043,6 +1046,13 @@ def _call_process(self, method: str, *args: None, **kwargs: None
) -> str:
... # if no args given, execute called with all defaults

@overload
def _call_process(self, method: str,
istream: int,
as_process: Literal[True],
*args: Any, **kwargs: Any
) -> 'Git.AutoInterrupt': ...

@overload
def _call_process(self, method: str, *args: Any, **kwargs: Any
) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], 'Git.AutoInterrupt']:
@@ -1156,7 +1166,7 @@ def _prepare_ref(self, ref: AnyStr) -> bytes:
return refstr.encode(defenc)

def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any
) -> Union['Git.AutoInterrupt', TBD]:
) -> 'Git.AutoInterrupt':
cur_val = getattr(self, attr_name)
if cur_val is not None:
return cur_val
@@ -1166,12 +1176,16 @@ def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwarg

cmd = self._call_process(cmd_name, *args, **options)
setattr(self, attr_name, cmd)
cmd = cast('Git.AutoInterrupt', cmd)
return cmd

def __get_object_header(self, cmd, ref: AnyStr) -> Tuple[str, str, int]:
cmd.stdin.write(self._prepare_ref(ref))
cmd.stdin.flush()
return self._parse_object_header(cmd.stdout.readline())
def __get_object_header(self, cmd: 'Git.AutoInterrupt', ref: AnyStr) -> Tuple[str, str, int]:
if cmd.stdin and cmd.stdout:
cmd.stdin.write(self._prepare_ref(ref))
cmd.stdin.flush()
return self._parse_object_header(cmd.stdout.readline())
else:
raise ValueError("cmd stdin was empty")

def get_object_header(self, ref: str) -> Tuple[str, str, int]:
""" Use this method to quickly examine the type and size of the object behind
@@ -1200,7 +1214,8 @@ def stream_object_data(self, ref: str) -> Tuple[str, str, int, 'Git.CatFileConte
:note: This method is not threadsafe, you need one independent Command instance per thread to be safe !"""
cmd = self._get_persistent_cmd("cat_file_all", "cat_file", batch=True)
hexsha, typename, size = self.__get_object_header(cmd, ref)
return (hexsha, typename, size, self.CatFileContentStream(size, cmd.stdout))
cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO()
return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout))

def clear_cache(self) -> 'Git':
"""Clear all kinds of internal caches to release resources.
@@ -40,14 +40,15 @@
from io import BytesIO

T_ConfigParser = TypeVar('T_ConfigParser', bound='GitConfigParser')
T_OMD_value = TypeVar('T_OMD_value', str, bytes, int, float, bool)

if sys.version_info[:3] < (3, 7, 2):
# typing.Ordereddict not added until py 3.7.2
from collections import OrderedDict # type: ignore # until 3.6 dropped
OrderedDict_OMD = OrderedDict # type: ignore # until 3.6 dropped
else:
from typing import OrderedDict # type: ignore # until 3.6 dropped
OrderedDict_OMD = OrderedDict[str, List[_T]] # type: ignore[assignment, misc]
OrderedDict_OMD = OrderedDict[str, List[T_OMD_value]] # type: ignore[assignment, misc]

# -------------------------------------------------------------

@@ -97,23 +98,23 @@ def __new__(cls, name: str, bases: TBD, clsdict: Dict[str, Any]) -> TBD:
return new_type


def needs_values(func: Callable) -> Callable:
def needs_values(func: Callable[..., _T]) -> Callable[..., _T]:
"""Returns method assuring we read values (on demand) before we try to access them"""

@wraps(func)
def assure_data_present(self, *args: Any, **kwargs: Any) -> Any:
def assure_data_present(self: 'GitConfigParser', *args: Any, **kwargs: Any) -> _T:
self.read()
return func(self, *args, **kwargs)
# END wrapper method
return assure_data_present


def set_dirty_and_flush_changes(non_const_func: Callable) -> Callable:
def set_dirty_and_flush_changes(non_const_func: Callable[..., _T]) -> Callable[..., _T]:
"""Return method that checks whether given non constant function may be called.
If so, the instance will be set dirty.
Additionally, we flush the changes right to disk"""

def flush_changes(self, *args: Any, **kwargs: Any) -> Any:
def flush_changes(self: 'GitConfigParser', *args: Any, **kwargs: Any) -> _T:
rval = non_const_func(self, *args, **kwargs)
self._dirty = True
self.write()
@@ -356,7 +357,7 @@ def __enter__(self) -> 'GitConfigParser':
self._acquire_lock()
return self

def __exit__(self, exception_type, exception_value, traceback) -> None:
def __exit__(self, *args: Any) -> None:
self.release()

def release(self) -> None:
@@ -613,12 +614,15 @@ def read(self) -> None: # type: ignore[override]
def _write(self, fp: IO) -> None:
"""Write an .ini-format representation of the configuration state in
git compatible format"""
def write_section(name, section_dict):
def write_section(name: str, section_dict: _OMD) -> None:
fp.write(("[%s]\n" % name).encode(defenc))

values: Sequence[Union[str, bytes, int, float, bool]]
for (key, values) in section_dict.items_all():
if key == "__name__":
continue

v: Union[str, bytes, int, float, bool]
for v in values:
fp.write(("\t%s = %s\n" % (key, self._value_to_string(v).replace('\n', '\n\t'))).encode(defenc))
# END if key is not __name__
@@ -251,7 +251,7 @@ def read_cache(stream: IO[bytes]) -> Tuple[int, Dict[Tuple[PathLike, int], 'Inde
return (version, entries, extension_data, content_sha)


def write_tree_from_cache(entries: List[IndexEntry], odb, sl: slice, si: int = 0
def write_tree_from_cache(entries: List[IndexEntry], odb: 'GitCmdObjectDB', sl: slice, si: int = 0
) -> Tuple[bytes, List['TreeCacheTup']]:
"""Create a tree from the given sorted list of entries and put the respective
trees into the given object database
@@ -25,6 +25,7 @@
from .tree import Tree
from .blob import Blob
from .submodule.base import Submodule
from git.refs.reference import Reference

IndexObjUnion = Union['Tree', 'Blob', 'Submodule']

@@ -59,7 +60,7 @@ def __init__(self, repo: 'Repo', binsha: bytes):
assert len(binsha) == 20, "Require 20 byte binary sha, got %r, len = %i" % (binsha, len(binsha))

@classmethod
def new(cls, repo: 'Repo', id): # @ReservedAssignment
def new(cls, repo: 'Repo', id: Union[str, 'Reference']) -> Commit_ish:
"""
:return: New Object instance of a type appropriate to the object type behind
id. The id of the newly created object will be a binsha even though
@@ -282,7 +282,7 @@ def iter_items(cls, repo: 'Repo', rev: Union[str, 'Commit', 'SymbolicReference']
proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
return cls._iter_from_process_or_stream(repo, proc)

def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs) -> Iterator['Commit']:
def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = '', **kwargs: Any) -> Iterator['Commit']:
"""Iterate _all_ parents of this commit.
:param paths:
@@ -362,7 +362,7 @@ def _iter_from_process_or_stream(cls, repo: 'Repo', proc_or_stream: Union[Popen,
def create_from_tree(cls, repo: 'Repo', tree: Union[Tree, str], message: str,
parent_commits: Union[None, List['Commit']] = None, head: bool = False,
author: Union[None, Actor] = None, committer: Union[None, Actor] = None,
author_date: Union[None, str] = None, commit_date: Union[None, str] = None):
author_date: Union[None, str] = None, commit_date: Union[None, str] = None) -> 'Commit':
"""Commit the given tree, creating a commit object.
:param repo: Repo object the commit should be part of
@@ -403,7 +403,7 @@ def create_from_tree(cls, repo: 'Repo', tree: Union[Tree, str], message: str,
else:
for p in parent_commits:
if not isinstance(p, cls):
raise ValueError("Parent commit '%r' must be of type %s" % (p, cls))
raise ValueError(f"Parent commit '{p!r}' must be of type {cls}")
# end check parent commit types
# END if parent commits are unset

@@ -57,6 +57,7 @@
if TYPE_CHECKING:
from git.index import IndexFile
from git.repo import Repo
from git.refs import Head


# -----------------------------------------------------------------------------
@@ -265,7 +266,7 @@ def _module_abspath(cls, parent_repo: 'Repo', path: PathLike, name: str) -> Path
# end

@classmethod
def _clone_repo(cls, repo, url, path, name, **kwargs):
def _clone_repo(cls, repo: 'Repo', url: str, path: PathLike, name: str, **kwargs: Any) -> 'Repo':
""":return: Repo instance of newly cloned repository
:param repo: our parent repository
:param url: url to clone from
@@ -279,7 +280,7 @@ def _clone_repo(cls, repo, url, path, name, **kwargs):
module_abspath_dir = osp.dirname(module_abspath)
if not osp.isdir(module_abspath_dir):
os.makedirs(module_abspath_dir)
module_checkout_path = osp.join(repo.working_tree_dir, path)
module_checkout_path = osp.join(str(repo.working_tree_dir), path)
# end

clone = git.Repo.clone_from(url, module_checkout_path, **kwargs)
@@ -484,7 +485,7 @@ def add(cls, repo: 'Repo', name: str, path: PathLike, url: Union[str, None] = No
def update(self, recursive: bool = False, init: bool = True, to_latest_revision: bool = False,
progress: Union['UpdateProgress', None] = None, dry_run: bool = False,
force: bool = False, keep_going: bool = False, env: Union[Mapping[str, str], None] = None,
clone_multi_options: Union[Sequence[TBD], None] = None):
clone_multi_options: Union[Sequence[TBD], None] = None) -> 'Submodule':
"""Update the repository of this submodule to point to the checkout
we point at with the binsha of this instance.
@@ -712,7 +713,7 @@ def update(self, recursive: bool = False, init: bool = True, to_latest_revision:
return self

@unbare_repo
def move(self, module_path, configuration=True, module=True):
def move(self, module_path: PathLike, configuration: bool = True, module: bool = True) -> 'Submodule':
"""Move the submodule to a another module path. This involves physically moving
the repository at our current path, changing the configuration, as well as
adjusting our index entry accordingly.
@@ -742,7 +743,7 @@ def move(self, module_path, configuration=True, module=True):
return self
# END handle no change

module_checkout_abspath = join_path_native(self.repo.working_tree_dir, module_checkout_path)
module_checkout_abspath = join_path_native(str(self.repo.working_tree_dir), module_checkout_path)
if osp.isfile(module_checkout_abspath):
raise ValueError("Cannot move repository onto a file: %s" % module_checkout_abspath)
# END handle target files
@@ -1160,7 +1161,7 @@ def exists(self) -> bool:
# END handle object state consistency

@property
def branch(self):
def branch(self) -> 'Head':
""":return: The branch instance that we are to checkout
:raise InvalidGitRepositoryError: if our module is not yet checked out"""
return mkhead(self.module(), self._branch_path)
@@ -375,8 +375,8 @@ def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
# END for each item
return False

def __reversed__(self):
return reversed(self._iter_convert_to_object(self._cache))
def __reversed__(self) -> Iterator[IndexObjUnion]:
return reversed(self._iter_convert_to_object(self._cache)) # type: ignore

def _serialize(self, stream: 'BytesIO') -> 'Tree':
"""Serialize this tree into the stream. Please note that we will assume
@@ -144,20 +144,20 @@ def __init__(self, secs_west_of_utc: float, name: Union[None, str] = None) -> No
def __reduce__(self) -> Tuple[Type['tzoffset'], Tuple[float, str]]:
return tzoffset, (-self._offset.total_seconds(), self._name)

def utcoffset(self, dt) -> timedelta:
def utcoffset(self, dt: Union[datetime, None]) -> timedelta:
return self._offset

def tzname(self, dt) -> str:
def tzname(self, dt: Union[datetime, None]) -> str:
return self._name

def dst(self, dt) -> timedelta:
def dst(self, dt: Union[datetime, None]) -> timedelta:
return ZERO


utc = tzoffset(0, 'UTC')


def from_timestamp(timestamp, tz_offset: float) -> datetime:
def from_timestamp(timestamp: float, tz_offset: float) -> datetime:
"""Converts a timestamp + tz_offset into an aware datetime instance."""
utc_dt = datetime.fromtimestamp(timestamp, utc)
try:
@@ -305,7 +305,7 @@ class Traversable(Protocol):

@classmethod
@abstractmethod
def _get_intermediate_items(cls, item) -> Sequence['Traversable']:
def _get_intermediate_items(cls, item: Any) -> Sequence['Traversable']:
"""
Returns:
Tuple of items connected to the given item.
@@ -327,7 +327,7 @@ def list_traverse(self, *args: Any, **kwargs: Any) -> Any:
stacklevel=2)
return self._list_traverse(*args, **kwargs)

def _list_traverse(self, as_edge=False, *args: Any, **kwargs: Any
def _list_traverse(self, as_edge: bool = False, *args: Any, **kwargs: Any
) -> IterableList[Union['Commit', 'Submodule', 'Tree', 'Blob']]:
"""
:return: IterableList with the results of the traversal as produced by

0 comments on commit 05825dc

Please sign in to comment.