Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 8093bf7

Browse files
committed
Fix diff_tables for non-threaded ('serial'); Fix for error reporting
1 parent f7ee60f commit 8093bf7

File tree

3 files changed

+20
-8
lines changed

3 files changed

+20
-8
lines changed

data_diff/__main__.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,18 @@
55
import logging
66
from itertools import islice
77

8-
from data_diff.tracking import disable_tracking
8+
import rich
9+
import click
910

10-
from .utils import remove_password_from_url, safezip, match_like
1111

12+
from .utils import remove_password_from_url, safezip, match_like
1213
from .diff_tables import TableDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
1314
from .table_segment import create_schema, TableSegment
14-
1515
from .databases.connect import connect
1616
from .parse_time import parse_time_before_now, UNITS_STR, ParseError
1717
from .config import apply_config_from_file
18+
from .tracking import disable_tracking
1819

19-
import rich
20-
import click
2120

2221
LOG_FORMAT = "[%(asctime)s] %(levelname)s - %(message)s"
2322
DATE_FORMAT = "%H:%M:%S"

data_diff/diff_tables.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from numbers import Number
77
from operator import attrgetter, methodcaller
88
from collections import defaultdict
9-
from typing import List, Tuple, Iterator, Optional
9+
from typing import Tuple, Iterator, Optional
1010
import logging
1111
from concurrent.futures import ThreadPoolExecutor, as_completed
1212

@@ -95,6 +95,7 @@ def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
9595

9696
self.stats["diff_count"] = 0
9797
start = time.monotonic()
98+
error = None
9899
try:
99100

100101
# Query and validate schema
@@ -137,7 +138,6 @@ def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
137138
post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)]
138139
yield from self._bisect_and_diff_tables(*post_tables)
139140

140-
error = None
141141
except BaseException as e: # Catch KeyboardInterrupt too
142142
error = e
143143
finally:
@@ -308,7 +308,8 @@ def _threaded_call(self, func, iterable):
308308

309309
def _thread_as_completed(self, func, iterable):
310310
if not self.threaded:
311-
return map(func, iterable)
311+
yield from map(func, iterable)
312+
return
312313

313314
with ThreadPoolExecutor(max_workers=self.max_threadpool_size) as task_pool:
314315
futures = [task_pool.submit(func, item) for item in iterable]

tests/test_diff_tables.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,18 @@ def test_diff_small_tables(self):
293293
self.assertEqual(2, self.differ.stats["table1_count"])
294294
self.assertEqual(1, self.differ.stats["table2_count"])
295295

296+
def test_non_threaded(self):
297+
differ = TableDiffer(3, 4, threaded=False)
298+
299+
time = "2022-01-01 00:00:00"
300+
time_str = f"timestamp '{time}'"
301+
cols = "id userid movieid rating timestamp".split()
302+
_insert_row(self.connection, self.table_src, cols, [1, 1, 1, 9, time_str])
303+
_insert_rows(self.connection, self.table_dst, cols, [[1, 1, 1, 9, time_str]])
304+
_commit(self.connection)
305+
diff = list(differ.diff_tables(self.table, self.table2))
306+
self.assertEqual(diff, [])
307+
296308
def test_diff_table_above_bisection_threshold(self):
297309
time = "2022-01-01 00:00:00"
298310
time_str = f"timestamp '{time}'"

0 commit comments

Comments
 (0)