Skip to content

Commit

Permalink
feat: cov.collect() context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
nedbat committed Aug 12, 2023
1 parent da7ee52 commit 5937a62
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 106 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ development at the same time, such as 4.5.x and 5.0.
Unreleased
----------

- Added a :meth:`.Coverage.collect` context manager to start and stop coverage
data collection.

- Dropped support for Python 3.7.

- Fix: in unusual circumstances, SQLite cannot be set to asynchronous mode.
Expand Down
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ Documentation is on `Read the Docs`_. Code repository and issue tracker are on
.. _GitHub: https://github.com/nedbat/coveragepy

**New in 7.x:**
dropped support for Python 3.7;
added ``Coverage.collect()`` context manager;
improved data combining;
``[run] exclude_also`` setting;
``report --format=``;
Expand Down
25 changes: 24 additions & 1 deletion coverage/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ class Coverage(TConfigurable):
cov.stop()
cov.html_report(directory="covhtml")
A context manager is available to do the same thing::
cov = Coverage()
with cov.collect():
#.. call your code ..
cov.html_report(directory="covhtml")
Note: in keeping with Python custom, names starting with underscore are
not part of the public API. They might stop working at any point. Please
limit yourself to documented methods to avoid problems.
Expand Down Expand Up @@ -607,13 +614,16 @@ def _init_data(self, suffix: Optional[Union[str, bool]]) -> None:
def start(self) -> None:
"""Start measuring code coverage.
Coverage measurement only occurs in functions called after
Coverage measurement is only collected in functions called after
:meth:`start` is invoked. Statements in the same scope as
:meth:`start` won't be measured.
Once you invoke :meth:`start`, you must also call :meth:`stop`
eventually, or your process might not shut down cleanly.
The :meth:`collect` method is a context manager to handle both
starting and stopping collection.
"""
self._init()
if not self._inited_for_start:
Expand Down Expand Up @@ -649,6 +659,19 @@ def stop(self) -> None:
self._collector.stop()
self._started = False

@contextlib.contextmanager
def collect(self) -> Iterator[None]:
"""A context manager to start/stop coverage measurement collection.
.. versionadded:: 7.3
"""
self.start()
try:
yield
finally:
self.stop()

def _atexit(self, event: str = "atexit") -> None:
"""Clean up on process shutdown."""
if self._debug.should("process"):
Expand Down
2 changes: 1 addition & 1 deletion coverage/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# version_info: same semantics as sys.version_info.
# _dev: the .devN suffix if any.
version_info = (7, 2, 8, "alpha", 0)
version_info = (7, 3, 0, "alpha", 0)
_dev = 1


Expand Down
1 change: 1 addition & 0 deletions metacov.ini
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ exclude_lines =
#
pragma: nested
cov.stop\(\)
with cov.collect\(\):

# Lines that are only executed when we are debugging coverage.py.
def __repr__
Expand Down
9 changes: 2 additions & 7 deletions tests/coveragetest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,9 @@ def start_import_stop(
The imported module is returned.
"""
cov.start()
try: # pragma: nested
with cov.collect():
# Import the Python file, executing it.
mod = import_local_file(modname, modfile)
finally: # pragma: nested
# Stop coverage.py.
cov.stop()
return mod
return import_local_file(modname, modfile)

def get_report(self, cov: Coverage, squeeze: bool = True, **kwargs: Any) -> str:
"""Get the report from `cov`, and canonicalize it."""
Expand Down
101 changes: 40 additions & 61 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,8 @@ def f1() -> None: # pragma: nested

def run_one_function(f: Callable[[], None]) -> None:
cov.erase()
cov.start()
f() # pragma: nested
cov.stop()
with cov.collect():
f()

fs = cov.get_data().measured_files()
lines.append(cov.get_data().lines(list(fs)[0]))
Expand Down Expand Up @@ -363,30 +362,26 @@ def test_start_stop_start_stop(self) -> None:
def test_start_save_stop(self) -> None:
self.make_code1_code2()
cov = coverage.Coverage()
cov.start()
import_local_file("code1") # pragma: nested
cov.save() # pragma: nested
import_local_file("code2") # pragma: nested
cov.stop()
with cov.collect():
import_local_file("code1")
cov.save()
import_local_file("code2")
self.check_code1_code2(cov)

def test_start_save_nostop(self) -> None:
self.make_code1_code2()
cov = coverage.Coverage()
cov.start()
import_local_file("code1") # pragma: nested
cov.save() # pragma: nested
import_local_file("code2") # pragma: nested
self.check_code1_code2(cov) # pragma: nested
# Then stop it, or the test suite gets out of whack.
cov.stop()
with cov.collect():
import_local_file("code1")
cov.save()
import_local_file("code2")
self.check_code1_code2(cov)

def test_two_getdata_only_warn_once(self) -> None:
self.make_code1_code2()
cov = coverage.Coverage(source=["."], omit=["code1.py"])
cov.start()
import_local_file("code1") # pragma: nested
cov.stop()
with cov.collect():
import_local_file("code1")
# We didn't collect any data, so we should get a warning.
with self.assert_warnings(cov, ["No data was collected"]):
cov.get_data()
Expand All @@ -398,17 +393,15 @@ def test_two_getdata_only_warn_once(self) -> None:
def test_two_getdata_warn_twice(self) -> None:
self.make_code1_code2()
cov = coverage.Coverage(source=["."], omit=["code1.py", "code2.py"])
cov.start()
import_local_file("code1") # pragma: nested
# We didn't collect any data, so we should get a warning.
with self.assert_warnings(cov, ["No data was collected"]): # pragma: nested
cov.save() # pragma: nested
import_local_file("code2") # pragma: nested
# Calling get_data a second time after tracing some more will warn again.
with self.assert_warnings(cov, ["No data was collected"]): # pragma: nested
cov.get_data() # pragma: nested
# Then stop it, or the test suite gets out of whack.
cov.stop()
with cov.collect():
import_local_file("code1")
# We didn't collect any data, so we should get a warning.
with self.assert_warnings(cov, ["No data was collected"]):
cov.save()
import_local_file("code2")
# Calling get_data a second time after tracing some more will warn again.
with self.assert_warnings(cov, ["No data was collected"]):
cov.get_data()

def make_good_data_files(self) -> None:
"""Make some good data files."""
Expand Down Expand Up @@ -632,9 +625,7 @@ def test_switch_context_testrunner(self) -> None:

# Test runner starts
cov = coverage.Coverage()
cov.start()

if "pragma: nested":
with cov.collect():
# Imports the test suite
suite = import_local_file("testsuite")

Expand All @@ -648,7 +639,6 @@ def test_switch_context_testrunner(self) -> None:

# Runner finishes
cov.save()
cov.stop()

# Labeled data is collected
data = cov.get_data()
Expand All @@ -670,9 +660,7 @@ def test_switch_context_with_static(self) -> None:

# Test runner starts
cov = coverage.Coverage(context="mysuite")
cov.start()

if "pragma: nested":
with cov.collect():
# Imports the test suite
suite = import_local_file("testsuite")

Expand All @@ -686,7 +674,6 @@ def test_switch_context_with_static(self) -> None:

# Runner finishes
cov.save()
cov.stop()

# Labeled data is collected
data = cov.get_data()
Expand All @@ -704,12 +691,11 @@ def test_switch_context_with_static(self) -> None:
def test_dynamic_context_conflict(self) -> None:
cov = coverage.Coverage(source=["."])
cov.set_option("run:dynamic_context", "test_function")
cov.start()
with pytest.warns(Warning) as warns: # pragma: nested
# Switch twice, but only get one warning.
cov.switch_context("test1")
cov.switch_context("test2")
cov.stop()
with cov.collect():
with pytest.warns(Warning) as warns:
# Switch twice, but only get one warning.
cov.switch_context("test1")
cov.switch_context("test2")
assert_coverage_warnings(warns, "Conflicting dynamic contexts (dynamic-conflict)")

def test_unknown_dynamic_context(self) -> None:
Expand All @@ -725,10 +711,9 @@ def test_switch_context_unstarted(self) -> None:
with pytest.raises(CoverageException, match=msg):
cov.switch_context("test1")

cov.start()
cov.switch_context("test2") # pragma: nested
with cov.collect():
cov.switch_context("test2")

cov.stop()
with pytest.raises(CoverageException, match=msg):
cov.switch_context("test3")

Expand All @@ -750,9 +735,8 @@ def test_config_crash_no_crash(self) -> None:
def test_run_debug_sys(self) -> None:
# https://github.com/nedbat/coveragepy/issues/907
cov = coverage.Coverage()
cov.start()
d = dict(cov.sys_info()) # pragma: nested
cov.stop()
with cov.collect():
d = dict(cov.sys_info())
assert cast(str, d['data_file']).endswith(".coverage")


Expand All @@ -779,12 +763,10 @@ def test_current(self) -> None:
self.assert_current_is_none(cur1)
assert cur0 is cur1
# Starting the instance makes it current.
cov.start()
if "# pragma: nested":
with cov.collect():
cur2 = coverage.Coverage.current()
assert cur2 is cov
# Stopping the instance makes current None again.
cov.stop()

cur3 = coverage.Coverage.current()
self.assert_current_is_none(cur3)
Expand Down Expand Up @@ -900,9 +882,8 @@ def coverage_usepkgs_counts(self, **kwargs: TCovKwargs) -> Dict[str, int]:
"""
cov = coverage.Coverage(**kwargs)
cov.start()
import usepkgs # pragma: nested # pylint: disable=import-error, unused-import
cov.stop()
with cov.collect():
import usepkgs # pylint: disable=import-error, unused-import
with self.assert_warnings(cov, []):
data = cov.get_data()
summary = line_counts(data)
Expand Down Expand Up @@ -993,9 +974,8 @@ class ReportIncludeOmitTest(IncludeOmitTestsMixin, CoverageTest):
def coverage_usepkgs(self, **kwargs: TCovKwargs) -> Iterable[str]:
"""Try coverage.report()."""
cov = coverage.Coverage()
cov.start()
import usepkgs # pragma: nested # pylint: disable=import-error, unused-import
cov.stop()
with cov.collect():
import usepkgs # pylint: disable=import-error, unused-import
report = io.StringIO()
cov.report(file=report, **kwargs)
return report.getvalue()
Expand All @@ -1012,9 +992,8 @@ class XmlIncludeOmitTest(IncludeOmitTestsMixin, CoverageTest):
def coverage_usepkgs(self, **kwargs: TCovKwargs) -> Iterable[str]:
"""Try coverage.xml_report()."""
cov = coverage.Coverage()
cov.start()
import usepkgs # pragma: nested # pylint: disable=import-error, unused-import
cov.stop()
with cov.collect():
import usepkgs # pylint: disable=import-error, unused-import
cov.xml_report(outfile="-", **kwargs)
return self.stdout()

Expand Down
25 changes: 10 additions & 15 deletions tests/test_concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,13 +624,11 @@ def run_thread() -> None: # pragma: nested
has_stopped_coverage.append(ident)

cov = coverage.Coverage()
cov.start()
with cov.collect():
t = threading.Thread(target=run_thread)
t.start()

t = threading.Thread(target=run_thread) # pragma: nested
t.start() # pragma: nested

time.sleep(0.1) # pragma: nested
cov.stop() # pragma: nested
time.sleep(0.1)
t.join()

assert has_started_coverage == [t.ident]
Expand Down Expand Up @@ -672,16 +670,13 @@ def random_load() -> None: # pragma: nested
duration = 0.01
for _ in range(3):
cov = coverage.Coverage()
cov.start()

threads = [threading.Thread(target=random_load) for _ in range(10)] # pragma: nested
should_run[0] = True # pragma: nested
for t in threads: # pragma: nested
t.start()

time.sleep(duration) # pragma: nested
with cov.collect():
threads = [threading.Thread(target=random_load) for _ in range(10)]
should_run[0] = True
for t in threads:
t.start()

cov.stop() # pragma: nested
time.sleep(duration)

# The following call used to crash with running background threads.
cov.get_data()
Expand Down
9 changes: 4 additions & 5 deletions tests/test_oddball.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,10 @@ def doit(calls):
calls = [getattr(sys.modules[cn], cn) for cn in callnames_list]

cov = coverage.Coverage()
cov.start()
# Call our list of functions: invoke the first, with the rest as
# an argument.
calls[0](calls[1:]) # pragma: nested
cov.stop() # pragma: nested
with cov.collect():
# Call our list of functions: invoke the first, with the rest as
# an argument.
calls[0](calls[1:])

# Clean the line data and compare to expected results.
# The file names are absolute, so keep just the base.
Expand Down
Loading

0 comments on commit 5937a62

Please sign in to comment.