Skip to content
77 changes: 72 additions & 5 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from io import BytesIO

# python additional imports
import google_crc32c

import pytest

# current library imports
Expand All @@ -28,6 +30,11 @@
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"


def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
step = (b - a) // 3
return a + step, a + 2 * step


async def write_one_appendable_object(
bucket_name: str,
object_name: str,
Expand Down Expand Up @@ -59,11 +66,21 @@ def appendable_object(storage_client, blobs_to_delete):


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
[
256, # less than _chunk size
10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE
],
)
@pytest.mark.parametrize(
"attempt_direct_path",
[True, False],
)
async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
async def test_basic_wrd(
storage_client, blobs_to_delete, attempt_direct_path, object_size
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
Expand All @@ -74,22 +91,72 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient(attempt_direct_path=attempt_direct_path).grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
await writer.append(_BYTES_TO_UPLOAD)
await writer.append(object_data)
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == len(_BYTES_TO_UPLOAD)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == _BYTES_TO_UPLOAD
assert mrd.persisted_size == len(_BYTES_TO_UPLOAD)
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"object_size",
[
10, # less than _chunk size,
10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE_BYTES
],
)
async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"

# Client instantiation; it cannot be part of fixture because.
# grpc_client's event loop and event loop of coroutine running it
# (i.e. this test) must be same.
# Note:
# 1. @pytest.mark.asyncio ensures new event loop for each test.
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient().grpc_client

writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
await writer.open()
mark1, mark2 = _get_equal_dist(0, object_size)
await writer.append(object_data[0:mark1])
await writer.append(object_data[mark1:mark2])
await writer.append(object_data[mark2:])
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
Expand Down