diff --git a/tests/system/test_zonal.py b/tests/system/test_zonal.py index 05bb317d7..9916fa21f 100644 --- a/tests/system/test_zonal.py +++ b/tests/system/test_zonal.py @@ -5,6 +5,8 @@ from io import BytesIO # python additional imports +import google_crc32c + import pytest # current library imports @@ -28,6 +30,11 @@ _BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object" +def _get_equal_dist(a: int, b: int) -> tuple[int, int]: + step = (b - a) // 3 + return a + step, a + 2 * step + + async def write_one_appendable_object( bucket_name: str, object_name: str, @@ -59,11 +66,21 @@ def appendable_object(storage_client, blobs_to_delete): @pytest.mark.asyncio +@pytest.mark.parametrize( + "object_size", + [ + 256, # less than _chunk size + 10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES + 20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE + ], +) @pytest.mark.parametrize( "attempt_direct_path", [True, False], ) -async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path): +async def test_basic_wrd( + storage_client, blobs_to_delete, attempt_direct_path, object_size +): object_name = f"test_basic_wrd-{str(uuid.uuid4())}" # Client instantiation; it cannot be part of fixture because. @@ -74,13 +91,16 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path): # 2. we can keep the same event loop for entire module but that may # create issues if tests are run in parallel and one test hogs the event # loop slowing down other tests. + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) await writer.open() - await writer.append(_BYTES_TO_UPLOAD) + await writer.append(object_data) object_metadata = await writer.close(finalize_on_close=True) - assert object_metadata.size == len(_BYTES_TO_UPLOAD) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) buffer = BytesIO() @@ -88,8 +108,55 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path): # (0, 0) means read the whole object await mrd.download_ranges([(0, 0, buffer)]) await mrd.close() - assert buffer.getvalue() == _BYTES_TO_UPLOAD - assert mrd.persisted_size == len(_BYTES_TO_UPLOAD) + assert buffer.getvalue() == object_data + assert mrd.persisted_size == object_size + + # Clean up; use json client (i.e. `storage_client` fixture) to delete. + blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name)) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "object_size", + [ + 10, # less than _chunk size, + 10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES + 20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE_BYTES + ], +) +async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size): + object_name = f"test_basic_wrd-{str(uuid.uuid4())}" + + # Client instantiation; it cannot be part of fixture because. + # grpc_client's event loop and event loop of coroutine running it + # (i.e. this test) must be same. + # Note: + # 1. @pytest.mark.asyncio ensures new event loop for each test. + # 2. we can keep the same event loop for entire module but that may + # create issues if tests are run in parallel and one test hogs the event + # loop slowing down other tests. + object_data = os.urandom(object_size) + object_checksum = google_crc32c.value(object_data) + grpc_client = AsyncGrpcClient().grpc_client + + writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name) + await writer.open() + mark1, mark2 = _get_equal_dist(0, object_size) + await writer.append(object_data[0:mark1]) + await writer.append(object_data[mark1:mark2]) + await writer.append(object_data[mark2:]) + object_metadata = await writer.close(finalize_on_close=True) + assert object_metadata.size == object_size + assert int(object_metadata.checksums.crc32c) == object_checksum + + mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name) + buffer = BytesIO() + await mrd.open() + # (0, 0) means read the whole object + await mrd.download_ranges([(0, 0, buffer)]) + await mrd.close() + assert buffer.getvalue() == object_data + assert mrd.persisted_size == object_size # Clean up; use json client (i.e. `storage_client` fixture) to delete. blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))