Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Optional, Union

from google_crc32c import Checksum
from google.api_core import exceptions

from ._utils import raise_if_no_fast_crc32c
from google.cloud import _storage_v2
Expand All @@ -36,7 +37,7 @@


_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB
_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB
_DEFAULT_FLUSH_INTERVAL_BYTES = 16 * 1024 * 1024 # 16 MiB


class AsyncAppendableObjectWriter:
Expand All @@ -49,6 +50,7 @@ def __init__(
object_name: str,
generation=None,
write_handle=None,
writer_options: Optional[dict] = None,
):
"""
Class for appending data to a GCS Appendable Object.
Expand Down Expand Up @@ -125,6 +127,21 @@ def __init__(
# Please note: `offset` and `persisted_size` are same when the stream is
# opened.
self.persisted_size: Optional[int] = None
if writer_options is None:
writer_options = {}
self.flush_interval = writer_options.get(
"FLUSH_INTERVAL_BYTES", _DEFAULT_FLUSH_INTERVAL_BYTES
)
# TODO: add test case for this.
if self.flush_interval < _MAX_CHUNK_SIZE_BYTES:
raise exceptions.OutOfRange(
f"flush_interval must be >= {_MAX_CHUNK_SIZE_BYTES} , but provided {self.flush_interval}"
)
if self.flush_interval % _MAX_CHUNK_SIZE_BYTES != 0:
raise exceptions.OutOfRange(
f"flush_interval must be a multiple of {_MAX_CHUNK_SIZE_BYTES}, but provided {self.flush_interval}"
)
self.bytes_appended_since_last_flush = 0

async def state_lookup(self) -> int:
"""Returns the persisted_size
Expand Down Expand Up @@ -193,7 +210,6 @@ async def append(self, data: bytes) -> None:
self.offset = self.persisted_size

start_idx = 0
bytes_to_flush = 0
while start_idx < total_bytes:
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
data_chunk = data[start_idx:end_idx]
Expand All @@ -208,10 +224,10 @@ async def append(self, data: bytes) -> None:
)
chunk_size = end_idx - start_idx
self.offset += chunk_size
bytes_to_flush += chunk_size
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
self.bytes_appended_since_last_flush += chunk_size
if self.bytes_appended_since_last_flush >= self.flush_interval:
await self.simple_flush()
bytes_to_flush = 0
self.bytes_appended_since_last_flush = 0
start_idx = end_idx

async def simple_flush(self) -> None:
Expand Down
54 changes: 54 additions & 0 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
_DEFAULT_FLUSH_INTERVAL_BYTES,
)
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
Expand Down Expand Up @@ -162,6 +163,59 @@ async def test_basic_wrd_in_slices(storage_client, blobs_to_delete, object_size)
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
@pytest.mark.parametrize(
"flush_interval",
[2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024, _DEFAULT_FLUSH_INTERVAL_BYTES],
)
async def test_wrd_with_non_default_flush_interval(
storage_client,
blobs_to_delete,
flush_interval,
):
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
object_size = 9 * 1024 * 1024

# Client instantiation; it cannot be part of fixture because.
# grpc_client's event loop and event loop of coroutine running it
# (i.e. this test) must be same.
# Note:
# 1. @pytest.mark.asyncio ensures new event loop for each test.
# 2. we can keep the same event loop for entire module but that may
# create issues if tests are run in parallel and one test hogs the event
# loop slowing down other tests.
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)
grpc_client = AsyncGrpcClient().grpc_client

writer = AsyncAppendableObjectWriter(
grpc_client,
_ZONAL_BUCKET,
object_name,
writer_options={"FLUSH_INTERVAL_BYTES": flush_interval},
)
await writer.open()
mark1, mark2 = _get_equal_dist(0, object_size)
await writer.append(object_data[0:mark1])
await writer.append(object_data[mark1:mark2])
await writer.append(object_data[mark2:])
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
buffer = BytesIO()
await mrd.open()
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
await mrd.close()
assert buffer.getvalue() == object_data
assert mrd.persisted_size == object_size

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))


@pytest.mark.asyncio
async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete):
object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}"
Expand Down
68 changes: 64 additions & 4 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
AsyncAppendableObjectWriter,
)
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
_MAX_CHUNK_SIZE_BYTES,
)
from google.cloud import _storage_v2


Expand All @@ -29,6 +32,7 @@
GENERATION = 123
WRITE_HANDLE = b"test-write-handle"
PERSISTED_SIZE = 456
EIGHT_MIB = 8 * 1024 * 1024


@pytest.fixture
Expand All @@ -52,6 +56,7 @@ def test_init(mock_write_object_stream, mock_client):
assert not writer._is_stream_open
assert writer.offset is None
assert writer.persisted_size is None
assert writer.bytes_appended_since_last_flush == 0

mock_write_object_stream.assert_called_once_with(
client=mock_client,
Expand All @@ -78,6 +83,7 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client):

assert writer.generation == GENERATION
assert writer.write_handle == WRITE_HANDLE
assert writer.bytes_appended_since_last_flush == 0

mock_write_object_stream.assert_called_once_with(
client=mock_client,
Expand All @@ -88,6 +94,60 @@ def test_init_with_optional_args(mock_write_object_stream, mock_client):
)


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
def test_init_with_writer_options(mock_write_object_stream, mock_client):
"""Test the constructor with optional arguments."""
writer = AsyncAppendableObjectWriter(
mock_client,
BUCKET,
OBJECT,
writer_options={"FLUSH_INTERVAL_BYTES": EIGHT_MIB},
)

assert writer.flush_interval == EIGHT_MIB
assert writer.bytes_appended_since_last_flush == 0

mock_write_object_stream.assert_called_once_with(
client=mock_client,
bucket_name=BUCKET,
object_name=OBJECT,
generation_number=None,
write_handle=None,
)


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
def test_init_with_flush_interval_less_than_chunk_size_raises_error(mock_client):
"""Test that an OutOfRange error is raised if flush_interval is less than the chunk size."""

with pytest.raises(exceptions.OutOfRange):
AsyncAppendableObjectWriter(
mock_client,
BUCKET,
OBJECT,
writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES - 1},
)


@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
def test_init_with_flush_interval_not_multiple_of_chunk_size_raises_error(mock_client):
"""Test that an OutOfRange error is raised if flush_interval is not a multiple of the chunk size."""

with pytest.raises(exceptions.OutOfRange):
AsyncAppendableObjectWriter(
mock_client,
BUCKET,
OBJECT,
writer_options={"FLUSH_INTERVAL_BYTES": _MAX_CHUNK_SIZE_BYTES + 1},
)


@mock.patch("google.cloud.storage._experimental.asyncio._utils.google_crc32c")
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
Expand Down Expand Up @@ -477,7 +537,7 @@ async def test_append_flushes_when_buffer_is_full(
):
"""Test that append flushes the stream when the buffer size is reached."""
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
_MAX_BUFFER_SIZE_BYTES,
_DEFAULT_FLUSH_INTERVAL_BYTES,
)

writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
Expand All @@ -487,7 +547,7 @@ async def test_append_flushes_when_buffer_is_full(
mock_stream.send = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()

data = b"a" * _MAX_BUFFER_SIZE_BYTES
data = b"a" * _DEFAULT_FLUSH_INTERVAL_BYTES
await writer.append(data)

writer.simple_flush.assert_awaited_once()
Expand All @@ -500,7 +560,7 @@ async def test_append_flushes_when_buffer_is_full(
async def test_append_handles_large_data(mock_write_object_stream, mock_client):
"""Test that append handles data larger than the buffer size."""
from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import (
_MAX_BUFFER_SIZE_BYTES,
_DEFAULT_FLUSH_INTERVAL_BYTES,
)

writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
Expand All @@ -510,7 +570,7 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
mock_stream.send = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()

data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1)
data = b"a" * (_DEFAULT_FLUSH_INTERVAL_BYTES * 2 + 1)
await writer.append(data)

assert writer.simple_flush.await_count == 2
Expand Down