Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix(cmd): don't open stdout when fetching
This allows us to use the main thread to parse stderr to get progress,
and resolve assertion failures hopefully once and for all.

Relates to #301
  • Loading branch information
Byron committed Jul 3, 2015
1 parent 36dbe7e commit 369e564
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 96 deletions.
155 changes: 75 additions & 80 deletions git/cmd.py
Expand Up @@ -41,7 +41,7 @@

execute_kwargs = ('istream', 'with_keep_cwd', 'with_extended_output',
'with_exceptions', 'as_process', 'stdout_as_string',
'output_stream')
'output_stream', 'with_stdout')

log = logging.getLogger('git.cmd')
log.addHandler(logging.NullHandler())
Expand All @@ -65,84 +65,6 @@ def _bchr(c):
# Documentation
## @{

def _parse_lines_from_buffer(buf):
line = b''
bi = 0
lb = len(buf)
while bi < lb:
char = _bchr(buf[bi])
bi += 1

if char in (b'\r', b'\n') and line:
yield bi, line
line = b''
else:
line += char
# END process parsed line
# END while file is not done reading
# end

def _read_lines_from_fno(fno, last_buf_list):
buf = os.read(fno, mmap.PAGESIZE)
buf = last_buf_list[0] + buf

bi = 0
for bi, line in _parse_lines_from_buffer(buf):
yield line
# for each line to parse from the buffer

# keep remainder
last_buf_list[0] = buf[bi:]

def _dispatch_single_line(line, handler):
line = line.decode(defenc)
if line and handler:
try:
handler(line)
except Exception:
# Keep reading, have to pump the lines empty nontheless
log.error("Line handler exception on line: %s", line, exc_info=True)
# end
# end dispatch helper
# end single line helper

def _dispatch_lines(fno, handler, buf_list):
lc = 0
last_buf = buf_list[0]
while True:
for line in _read_lines_from_fno(fno, buf_list):
_dispatch_single_line(line, handler)
lc += 1
# for each line

if last_buf == buf_list[0]:
break

last_buf = buf_list[0]
# end endless loop
return lc
# end

def _deplete_buffer(fno, handler, buf_list, wg=None):
lc = 0
while True:
line_count = _dispatch_lines(fno, handler, buf_list)
lc += line_count
if line_count == 0:
break
# end deplete buffer

if buf_list[0]:
_dispatch_single_line(buf_list[0], handler)
lc += 1
# end

if wg:
wg.done()

return lc
# end

def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
"""Registers for notifications to lean that process output is ready to read, and dispatches lines to
the respective line handlers. We are able to handle carriage returns in case progress is sent by that
Expand All @@ -156,6 +78,76 @@ def handle_process_output(process, stdout_handler, stderr_handler, finalizer):
fdmap = {process.stdout.fileno(): (stdout_handler, [b'']),
process.stderr.fileno(): (stderr_handler, [b''])}

def _parse_lines_from_buffer(buf):
line = b''
bi = 0
lb = len(buf)
while bi < lb:
char = _bchr(buf[bi])
bi += 1

if char in (b'\r', b'\n') and line:
yield bi, line
line = b''
else:
line += char
# END process parsed line
# END while file is not done reading
# end

def _read_lines_from_fno(fno, last_buf_list):
buf = os.read(fno, mmap.PAGESIZE)
buf = last_buf_list[0] + buf

bi = 0
for bi, line in _parse_lines_from_buffer(buf):
yield line
# for each line to parse from the buffer

# keep remainder
last_buf_list[0] = buf[bi:]

def _dispatch_single_line(line, handler):
line = line.decode(defenc)
if line and handler:
try:
handler(line)
except Exception:
# Keep reading, have to pump the lines empty nontheless
log.error("Line handler exception on line: %s", line, exc_info=True)
# end
# end dispatch helper
# end single line helper

def _dispatch_lines(fno, handler, buf_list):
lc = 0
for line in _read_lines_from_fno(fno, buf_list):
_dispatch_single_line(line, handler)
lc += 1
# for each line
return lc
# end

def _deplete_buffer(fno, handler, buf_list, wg=None):
lc = 0
while True:
line_count = _dispatch_lines(fno, handler, buf_list)
lc += line_count
if line_count == 0:
break
# end deplete buffer

if buf_list[0]:
_dispatch_single_line(buf_list[0], handler)
lc += 1
# end

if wg:
wg.done()

return lc
# end

if hasattr(select, 'poll'):
# poll is preferred, as select is limited to file handles up to 1024 ... . This could otherwise be
# an issue for us, as it matters how many handles our own process has
Expand Down Expand Up @@ -483,6 +475,7 @@ def execute(self, command,
as_process=False,
output_stream=None,
stdout_as_string=True,
with_stdout=True,
**subprocess_kwargs
):
"""Handles executing the command on the shell and consumes and returns
Expand Down Expand Up @@ -536,6 +529,8 @@ def execute(self, command,
some of the valid kwargs are already set by this method, the ones you
specify may not be the same ones.
:param with_stdout: If True, default True, we open stdout on the created process
:return:
* str(output) if extended_output = False (Default)
* tuple(int(status), str(stdout), str(stderr)) if extended_output = True
Expand Down Expand Up @@ -586,7 +581,7 @@ def execute(self, command,
cwd=cwd,
stdin=istream,
stderr=PIPE,
stdout=PIPE,
stdout=with_stdout and PIPE or None,
shell=self.USE_SHELL,
close_fds=(os.name == 'posix'), # unsupported on windows
**subprocess_kwargs
Expand Down
9 changes: 4 additions & 5 deletions git/remote.py
Expand Up @@ -550,7 +550,7 @@ def _get_fetch_info_from_stderr(self, proc, progress):

progress_handler = progress.new_message_handler()

def my_progress_handler(line):
for line in proc.stderr.readlines():
for pline in progress_handler(line):
if line.startswith('fatal:') or line.startswith('error:'):
raise GitCommandError(("Error when fetching: %s" % line,), 2)
Expand All @@ -563,9 +563,7 @@ def my_progress_handler(line):
# end for each comand code we know
# end for each line progress didn't handle
# end

# We are only interested in stderr here ...
handle_process_output(proc, None, my_progress_handler, finalize_process)
finalize_process(proc)

# read head information
fp = open(join(self.repo.git_dir, 'FETCH_HEAD'), 'rb')
Expand Down Expand Up @@ -651,7 +649,8 @@ def fetch(self, refspec=None, progress=None, **kwargs):
else:
args = [refspec]

proc = self.repo.git.fetch(self, *args, with_extended_output=True, as_process=True, v=True, **kwargs)
proc = self.repo.git.fetch(self, *args, with_extended_output=True, as_process=True, with_stdout=True, v=True,
**kwargs)
res = self._get_fetch_info_from_stderr(proc, progress or RemoteProgress())
if hasattr(self.repo.odb, 'update_cache'):
self.repo.odb.update_cache()
Expand Down
5 changes: 5 additions & 0 deletions git/test/fixtures/cat_file.py
@@ -0,0 +1,5 @@
import sys

for line in open(sys.argv[1]).readlines():
sys.stdout.write(line)
sys.stderr.write(line)
32 changes: 21 additions & 11 deletions git/test/test_git.py
Expand Up @@ -5,7 +5,9 @@
# This module is part of GitPython and is released under
# the BSD License: http://www.opensource.org/licenses/bsd-license.php
import os
import sys
import mock
import subprocess

from git.test.lib import (
TestBase,
Expand All @@ -22,7 +24,6 @@
GitCommandNotFound,
Repo
)
from git.cmd import _deplete_buffer
from gitdb.test.lib import with_rw_directory

from git.compat import PY3
Expand Down Expand Up @@ -206,16 +207,25 @@ def test_environment(self, rw_dir):
# end
# end if select.poll exists

def test_dispatch_lines(self):
def test_handle_process_output(self):
from git.cmd import handle_process_output

line_count = 5002
count = [0]
def counter(line):
count[0] += 1
count = [None, 0, 0]

def counter_stdout(line):
count[1] += 1

def counter_stderr(line):
count[2] += 1

proc = subprocess.Popen([sys.executable, fixture_path('cat_file.py'), str(fixture_path('issue-301_stderr'))],
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False)

fd = os.open(fixture_path('issue-301_stderr'), os.O_RDONLY)
buf_list = [b'']
lines_parsed = _deplete_buffer(fd, counter, buf_list)
os.close(fd)
handle_process_output(proc, counter_stdout, counter_stderr, lambda proc: proc.wait())

assert lines_parsed == line_count
assert count[0] == line_count
assert count[1] == line_count
assert count[2] == line_count

0 comments on commit 369e564

Please sign in to comment.