diff --git a/benchmarks/async_tasks_downloade_mp.py b/benchmarks/async_tasks_downloade_mp.py new file mode 100644 index 000000000..aa8d4f933 --- /dev/null +++ b/benchmarks/async_tasks_downloade_mp.py @@ -0,0 +1,88 @@ +import asyncio +import time +from io import BytesIO +from concurrent.futures import ThreadPoolExecutor +from multiprocessing import Pool + + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + +BUCKET_NAME = "chandrasiri-rs" +OBJECT_SIZE = 100 * 1024 * 1024 + + +async def download_object_async(bucket_name, object_name, client=None): + """Downloads a single object.""" + if client is None: + client = AsyncGrpcClient().grpc_client + + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + buffer = BytesIO() + await mrd.download_ranges(read_ranges=[(0, 0, buffer)]) + await mrd.close() + + assert buffer.getbuffer().nbytes == OBJECT_SIZE + + # Save the downloaded object to a local file + # with open(object_name, "wb") as f: + # f.write(buffer.getvalue()) + + print(f"Finished downloading {object_name}") + + +async def download_objects_pool(start_obj_num, end_obj_num): + """ """ + print(f"starting for {start_obj_num}, {end_obj_num}") + + client = AsyncGrpcClient().grpc_client + tasks = [] + pool_start_time = time.monotonic_ns() + for obj_num in range(start_obj_num, end_obj_num): + tasks.append( + asyncio.create_task( + download_object_async(BUCKET_NAME, f"para_64-{obj_num}", client=client) + ) + ) + + await asyncio.gather(*tasks) + pool_end_time = time.monotonic_ns() + print( + f"for {start_obj_num} , {end_obj_num}, {end_obj_num - start_obj_num} tasks done! in {(pool_end_time - pool_start_time) / (10**9)}s" + ) + + +def async_runner(start_obj_num, end_obj_num): + asyncio.run(download_objects_pool(start_obj_num, end_obj_num)) + + +def main(): + num_object = 3000 + process_count = 60 + objects_per_process = num_object // process_count # 150 + args = [] + start_obj_num = 0 + for _ in range(process_count): + args.append((start_obj_num, start_obj_num + objects_per_process)) + start_obj_num += objects_per_process + # print(f"start {process_count} proc") + start_time_proc = time.monotonic_ns() + print(args, len(args)) + + with Pool(processes=process_count) as pool: + results = pool.starmap(async_runner, args) + end_time_proc = time.monotonic_ns() + + print( + f"TOTAL: bytes - {num_object*OBJECT_SIZE}, time: {(end_time_proc - start_time_proc) / (10**9)}s" + ) + print( + f"Throuput: {num_object*OBJECT_SIZE /((end_time_proc - start_time_proc) / (10**9))*10**-6} MBps" + ) + + +if __name__ == "__main__": + main() diff --git a/benchmarks/async_tasks_downloader.py b/benchmarks/async_tasks_downloader.py new file mode 100644 index 000000000..0cb54179f --- /dev/null +++ b/benchmarks/async_tasks_downloader.py @@ -0,0 +1,70 @@ +import asyncio +import time +from io import BytesIO +from concurrent.futures import ThreadPoolExecutor + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + +BUCKET_NAME = "chandrasiri-rs" +OBJECT_SIZE = 100 * 1024 * 1024 + + +async def download_object_async(bucket_name, object_name, client=None): + """Downloads a single object.""" + if client is None: + client = AsyncGrpcClient().grpc_client + + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + buffer = BytesIO() + await mrd.download_ranges(read_ranges=[(0, 0, buffer)]) + await mrd.close() + + assert buffer.getbuffer().nbytes == OBJECT_SIZE + + # Save the downloaded object to a local file + # with open(object_name, "wb") as f: + # f.write(buffer.getvalue()) + + # print(f"Finished downloading {object_name}") + + +async def download_objects_pool(start_obj_num, end_obj_num): + """ """ + + client = AsyncGrpcClient().grpc_client + tasks = [] + pool_start_time = time.monotonic_ns() + for obj_num in range(start_obj_num, end_obj_num): + tasks.append( + asyncio.create_task( + download_object_async(BUCKET_NAME, f"para_64-{obj_num}", client=client) + ) + ) + + await asyncio.gather(*tasks) + pool_end_time = time.monotonic_ns() + print( + f"{end_obj_num - start_obj_num} tasks done! in {(pool_end_time - pool_start_time) / (10**9)}s" + ) + + +async def main(): + """Main function to orchestrate parallel downloads using threads.""" + num_objects = 1000 + pool_size = 100 + start_time = time.monotonic_ns() + + for i in range(0, num_objects, pool_size): + await download_objects_pool(i, i + pool_size) + end_time = time.monotonic_ns() + print( + f"FINSHED: total bytes downloaded - {num_objects*OBJECT_SIZE} in time {(end_time - start_time) / (10**9)}s" + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/benchmarks/download_one_object_using_n_streams.py b/benchmarks/download_one_object_using_n_streams.py new file mode 100644 index 000000000..8d1fd9a32 --- /dev/null +++ b/benchmarks/download_one_object_using_n_streams.py @@ -0,0 +1,181 @@ +import argparse +import asyncio +from io import BytesIO +import os +import time +import threading + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + + +async def download_range_async( + client, bucket_name, object_name, start_byte, end_byte, chunk_size +): + """ + Downloads a specific byte range of an object. + This is a modified version of the original download_one_async, adapted to + download a portion of an object. + """ + download_size = end_byte - start_byte + print( + f"Downloading {object_name} from byte {start_byte} to {end_byte} (size {download_size}) in chunks of {chunk_size} from {bucket_name} from process {os.getpid()} and thread {threading.get_ident()}" + ) + + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + + offset = 0 + output_buffer = BytesIO() + + start_time = time.perf_counter() + while offset < download_size: + bytes_to_download = min(chunk_size, download_size - offset) + await mrd.download_ranges( + [(start_byte + offset, bytes_to_download, output_buffer)] + ) + offset += bytes_to_download + end_time = time.perf_counter() + + elapsed_time = end_time - start_time + throughput_mbs = ( + (download_size / elapsed_time) / (1000 * 1000) if elapsed_time > 0 else 0 + ) + + print(f"Time taken for download loop: {elapsed_time:.4f} seconds") + print(f"Throughput for this range: {throughput_mbs:.2f} MB/s") + + assert ( + output_buffer.getbuffer().nbytes == download_size + ), f"downloaded size incorrect for portion of {object_name}" + + await mrd.close() + return output_buffer + + +async def download_one_object_with_n_streams_async( + bucket_name, object_name, download_size, chunk_size, num_streams +): + """ + Downloads a single object using 'n' concurrent streams. + It divides the object into 'n' parts and creates an async task to download each part. + """ + print( + f"Downloading {object_name} of size {download_size} from {bucket_name} using {num_streams} streams." + ) + + # Create one client to be shared by all download tasks. + client = AsyncGrpcClient().grpc_client + + tasks = [] + + # Calculate the byte range for each stream. + portion_size = download_size // num_streams + + for i in range(num_streams): + start = i * portion_size + end = start + portion_size + if i == num_streams - 1: + # The last stream downloads any remaining bytes. + end = download_size + + task = asyncio.create_task( + download_range_async( + client, bucket_name, object_name, start, end, chunk_size + ) + ) + tasks.append(task) + + # Wait for all download tasks to complete. + downloaded_parts = await asyncio.gather(*tasks) + + # Stitch the downloaded parts together in the correct order. + final_buffer = BytesIO() + for part in downloaded_parts: + final_buffer.write(part.getbuffer()) + + # Verify the final size. + final_size = final_buffer.getbuffer().nbytes + assert ( + final_size == download_size + ), f"Downloaded size incorrect for {object_name}. Expected {download_size}, got {final_size}" + print(f"Successfully downloaded {object_name} with size {final_size}") + + +def main(): + parser = argparse.ArgumentParser( + description="Download a single GCS object using multiple concurrent streams." + ) + parser.add_argument("--bucket_name", type=str, default="chandrasiri-rs") + parser.add_argument( + "--download_size", type=int, default=1024 * 1024 * 1024 + ) # 1 GiB + parser.add_argument( + "--chunk_size", type=int, default=64 * 1024 * 1024 + ) # 64 MiB + parser.add_argument( + "--count", + type=int, + default=1, + help="Number of times to run the download (for benchmarking).", + ) + parser.add_argument( + "--start_object_num", + type=int, + default=0, + help="The number of the object to download (e.g., py-sdk-mb-mt-{start_object_num}).", + ) + parser.add_argument( + "-n", + "--num_workers", + type=int, + default=10, + help="Number of streams to use for downloading.", + ) + args = parser.parse_args() + + total_start_time = time.perf_counter() + + object_name = f"py-sdk-mb-mt-{args.start_object_num}" + + for i in range(args.count): + print(f"\n--- Starting download run {i+1}/{args.count} ---") + run_start_time = time.perf_counter() + + asyncio.run( + download_one_object_with_n_streams_async( + args.bucket_name, + object_name, + args.download_size, + args.chunk_size, + args.num_workers, + ) + ) + + run_end_time = time.perf_counter() + run_latency = run_end_time - run_start_time + run_throughput = (args.download_size / run_latency) / (1000 * 1000) + print(f"Run {i+1} throughput: {run_throughput:.2f} MB/s") + + total_end_time = time.perf_counter() + total_latency = total_end_time - total_start_time + total_downloaded_bytes = args.download_size * args.count + aggregate_throughput = (total_downloaded_bytes / total_latency) / ( + 1000 * 1000 + ) # MB/s + + print("\n--- Aggregate Results ---") + print(f"Total download runs: {args.count}") + print(f"Object name: {object_name}") + print( + f"Total data downloaded: {total_downloaded_bytes / (1024*1024*1024):.2f} GiB" + ) + print(f"Total time taken: {total_latency:.2f} seconds") + print(f"Aggregate throughput: {aggregate_throughput:.2f} MB/s") + print(f"Number of streams used per download: {args.num_workers}") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/download_zb_n_workers.py b/benchmarks/download_zb_n_workers.py new file mode 100644 index 000000000..0ab402120 --- /dev/null +++ b/benchmarks/download_zb_n_workers.py @@ -0,0 +1,80 @@ +import argparse +import asyncio +from io import BytesIO +import os +import time +import threading +import random +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + +async def download_one_async(bucket_name, object_name, download_size, chunk_size): + """Downloads a single object of size `download_size`, in chunks of `chunk_size`""" + print(f"Downloading {object_name} of size {download_size} in chunks of {chunk_size} from {bucket_name} from process {os.getpid()} and thread {threading.get_ident()}") + # raise NotImplementedError("This function is not yet implemented.") + client = AsyncGrpcClient().grpc_client + + # log_peak_memory_usage() + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + + # download in chunks of `chunk_size` + offset = 0 + output_buffer = BytesIO() + while offset < download_size: + bytes_to_download = min(chunk_size, download_size - offset) + await mrd.download_ranges([(offset, bytes_to_download, output_buffer)]) + offset += bytes_to_download + # await mrd.download_ranges([(offset, 0, output_buffer)]) + assert output_buffer.getbuffer().nbytes == download_size, f"downloaded size incorrect for {object_name}" + + await mrd.close() + +def download_one_sync(bucket_name, object_name, download_size, chunk_size): + """Wrapper to run the async download_one in a new event loop.""" + asyncio.run(download_one_async(bucket_name, object_name, download_size, chunk_size)) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--bucket_name", type=str, default='chandrasiri-rs') + parser.add_argument("--download_size", type=int, default=1024 * 1024 * 1024) # 1 GiB + parser.add_argument("--chunk_size", type=int, default=100 * 1024 * 1024) # 100 MiB + parser.add_argument("--count", type=int, default=100) + parser.add_argument("--start_object_num", type=int, default=0) + parser.add_argument("-n", "--num_workers", type=int, default=2, help="Number of worker threads or processes.") + parser.add_argument("--executor", type=str, choices=['thread', 'process'], default='process', help="Executor to use: 'thread' for ThreadPoolExecutor, 'process' for ProcessPoolExecutor") + args = parser.parse_args() + + total_start_time = time.perf_counter() + + ExecutorClass = ThreadPoolExecutor if args.executor == 'thread' else ProcessPoolExecutor + object_count = args.count + with ExecutorClass(max_workers=args.num_workers) as executor: + futures = [] + for i in range(args.start_object_num, args.start_object_num + object_count): + object_name = f"py-sdk-mb-mt-{i}" + future = executor.submit(download_one_sync, args.bucket_name, object_name, args.download_size, args.chunk_size) + futures.append(future) + + for future in futures: + future.result() # wait for all workers to complete and raise exceptions + + total_end_time = time.perf_counter() + total_latency = total_end_time - total_start_time + total_downloaded_bytes = args.download_size * object_count + aggregate_throughput = (total_downloaded_bytes / total_latency) / (1000 * 1000) # MB/s + + print("\n--- Aggregate Results ---") + print(f"Total objects to download: {object_count}") + print(f"Total data to download: {total_downloaded_bytes / (1024*1024*1024):.2f} GiB") + print(f"Total time taken: {total_latency:.2f} seconds") + print(f"Aggregate throughput: {aggregate_throughput:.2f} MB/s") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/download_zb_n_workers_same_client.py b/benchmarks/download_zb_n_workers_same_client.py new file mode 100644 index 000000000..99bf66d12 --- /dev/null +++ b/benchmarks/download_zb_n_workers_same_client.py @@ -0,0 +1,121 @@ +import argparse +import asyncio +from io import BytesIO +import os +import time +import threading +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + + +async def download_one_async(client, bucket_name, object_name, download_size, chunk_size): + """ + Downloads a single object of size `download_size`, in chunks of `chunk_size`. + Accepts an existing client object. + """ + print( + f"Downloading {object_name} of size {download_size} in chunks of {chunk_size} from {bucket_name} from process {os.getpid()} and thread {threading.get_ident()}" + ) + + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + + # download in chunks of `chunk_size` + offset = 0 + output_buffer = BytesIO() + while offset < download_size: + bytes_to_download = min(chunk_size, download_size - offset) + await mrd.download_ranges([(offset, bytes_to_download, output_buffer)]) + offset += bytes_to_download + + assert ( + output_buffer.getbuffer().nbytes == download_size + ), f"downloaded size incorrect for {object_name}" + + await mrd.close() + + +def download_one_sync(bucket_name, object_name, download_size, chunk_size): + """Wrapper to create a client and run the async download in a new event loop.""" + + async def _async_wrapper(): + """Creates client and calls the download function.""" + client = AsyncGrpcClient().grpc_client + await download_one_async( + client, bucket_name, object_name, download_size, chunk_size + ) + + asyncio.run(_async_wrapper()) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--bucket_name", type=str, default="chandrasiri-rs") + parser.add_argument( + "--download_size", type=int, default=1024 * 1024 * 1024 + ) # 1 GiB + parser.add_argument( + "--chunk_size", type=int, default=64 * 1024 * 1024 + ) # 100 MiB + parser.add_argument("--count", type=int, default=100) + parser.add_argument("--start_object_num", type=int, default=0) + parser.add_argument( + "-n", + "--num_workers", + type=int, + default=2, + help="Number of worker threads or processes.", + ) + parser.add_argument( + "--executor", + type=str, + choices=["thread", "process"], + default="process", + help="Executor to use: 'thread' for ThreadPoolExecutor, 'process' for ProcessPoolExecutor", + ) + args = parser.parse_args() + + total_start_time = time.perf_counter() + + ExecutorClass = ( + ThreadPoolExecutor if args.executor == "thread" else ProcessPoolExecutor + ) + object_count = args.count + with ExecutorClass(max_workers=args.num_workers) as executor: + futures = [] + for i in range(args.start_object_num, args.start_object_num + object_count): + object_name = f"py-sdk-mb-mt-{i}" + future = executor.submit( + download_one_sync, + args.bucket_name, + object_name, + args.download_size, + args.chunk_size, + ) + futures.append(future) + + for future in futures: + future.result() # wait for all workers to complete and raise exceptions + + total_end_time = time.perf_counter() + total_latency = total_end_time - total_start_time + total_downloaded_bytes = args.download_size * object_count + aggregate_throughput = (total_downloaded_bytes / total_latency) / ( + 1000 * 1000 + ) # MB/s + + print("\n--- Aggregate Results ---") + print(f"Total objects to download: {object_count}") + print( + f"Total data to download: {total_downloaded_bytes / (1024*1024*1024):.2f} GiB" + ) + print(f"Total time taken: {total_latency:.2f} seconds") + print(f"Aggregate throughput: {aggregate_throughput:.2f} MB/s") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/find_crc.py b/benchmarks/find_crc.py new file mode 100644 index 000000000..3268cc9f9 --- /dev/null +++ b/benchmarks/find_crc.py @@ -0,0 +1,43 @@ +import google_crc32c +import base64 +import sys + +# The file to check +# file_path = 'part_data.bin' +# file_path = "/usr/local/google/home/chandrasiri/checksum/dec8/chunk3" # Example file path, replace with your actual file +file_path = sys.argv[1] +# # Example file path, replace with your actual file + +try: + crc32c_int = 0 + # data = b"" + (b"A" * 256 * 1024) + b"" + # crc32c_int = google_crc32c.extend(crc32c_int, data) + with open(file_path, "rb") as f: + # Calculate the CRC32C checksum as an integer + crc32c_int = 0 + while True: + chunk = f.read(1024 * 1024 * 1024) # Read in 64KiB chunks + if not chunk: + break + crc32c_int = google_crc32c.extend(crc32c_int, chunk) + + # --- Format the checksum in all three ways --- + # 1. Hexadecimal format (32-bit, zero-padded) + crc32c_hex = f"{crc32c_int:08x}" + + # 2. Base64 format (from big-endian bytes) + crc32c_bytes = crc32c_int.to_bytes(4, "big") + base64_encoded = base64.b64encode(crc32c_bytes) + print("this is the crc32c in base64_encoded (in bytes) ", base64_encoded) + crc32c_base64 = base64_encoded.decode("utf-8") + + # --- Print the results --- + print(f"✅ Checksum results for '{file_path}':") + print(f" - Integer: {crc32c_int}") + print(f" - Hex: {crc32c_hex}") + print(f" - Base64 (after decoding ): {crc32c_base64}") + +except FileNotFoundError: + print(f"❌ Error: The file '{file_path}' was not found.") +except Exception as e: + print(f"An error occurred: {e}") diff --git a/benchmarks/parallel_download_and_save.py b/benchmarks/parallel_download_and_save.py new file mode 100644 index 000000000..a97158e86 --- /dev/null +++ b/benchmarks/parallel_download_and_save.py @@ -0,0 +1,144 @@ + +import argparse +import asyncio +from io import BytesIO +import multiprocessing +import os +import time +import threading +from google.cloud import storage +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + + +async def download_range_async( + client, bucket_name, object_name, start_byte, end_byte, chunk_size +): + """ + Downloads a specific byte range of an object and returns the bytes. + """ + download_size = end_byte - start_byte + print( + f"Downloading {object_name} from byte {start_byte} to {end_byte} (size {download_size}) in chunks of {chunk_size} from {bucket_name} from process {os.getpid()} and thread {threading.get_ident()}" + ) + + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + + offset = 0 + output_buffer = BytesIO() + while offset < download_size: + bytes_to_download = min(chunk_size, download_size - offset) + await mrd.download_ranges([(start_byte + offset, bytes_to_download, output_buffer)]) + offset += bytes_to_download + + assert ( + output_buffer.getbuffer().nbytes == download_size + ), f"downloaded size incorrect for portion of {object_name}" + + await mrd.close() + return (start_byte, output_buffer.getvalue()) + + +async def run_coroutines(bucket_name, object_name, chunk_size, ranges): + """ + Run coroutines for downloading specified ranges and return the downloaded chunks. + """ + client = AsyncGrpcClient().grpc_client + tasks = [] + for start, end in ranges: + task = asyncio.create_task( + download_range_async( + client, bucket_name, object_name, start, end, chunk_size + ) + ) + tasks.append(task) + + results = await asyncio.gather(*tasks) + return results + + +def worker(bucket_name, object_name, chunk_size, ranges): + """ + A worker process that downloads a set of byte ranges and returns the data. + """ + print(f"Process {os.getpid()} starting with {len(ranges)} ranges.") + downloaded_chunks = asyncio.run(run_coroutines(bucket_name, object_name, chunk_size, ranges)) + return downloaded_chunks + + +def main(): + parser = argparse.ArgumentParser( + description="Download a GCS object in parallel and save it to a file." + ) + parser.add_argument("--bucket_name", type=str, default="chandrasiri-rs", help="GCS bucket name.") + parser.add_argument("--object_name", type=str, required=True, help="GCS object name.") + parser.add_argument("--output_file", type=str, required=True, help="Path to save the downloaded file.") + parser.add_argument("--size", type=int, default=1024**3, help="Object size.") + parser.add_argument("-n", "--num_processes", type=int, default=4, help="Number of processes to use.") + parser.add_argument("-m", "--num_coroutines_per_process", type=int, default=2, help="Number of coroutines per process.") + parser.add_argument("--chunk_size", type=int, default=64 * 1024 * 1024, help="Chunk size for each download stream.") + + args = parser.parse_args() + + # Get object size from args + object_size = args.size + + total_coroutines = args.num_processes * args.num_coroutines_per_process + + print(f"Starting download of {args.object_name} ({object_size} bytes) from bucket {args.bucket_name}") + print(f"Using {args.num_processes} processes and {args.num_coroutines_per_process} coroutines per process ({total_coroutines} total workers).") + + # Calculate ranges + base_range_size = object_size // total_coroutines + remainder = object_size % total_coroutines + + ranges = [] + current_byte = 0 + for i in range(total_coroutines): + range_size = base_range_size + (1 if i < remainder else 0) + start_byte = current_byte + end_byte = current_byte + range_size + ranges.append((start_byte, end_byte)) + current_byte = end_byte + + # Distribute adjacent ranges among processes + ranges_per_process = args.num_coroutines_per_process + process_ranges = [ + ranges[i * ranges_per_process : (i + 1) * ranges_per_process] + for i in range(args.num_processes) + ] + + start_time = time.perf_counter() + + with multiprocessing.Pool(args.num_processes) as pool: + results = pool.starmap(worker, [(args.bucket_name, args.object_name, args.chunk_size, pr) for pr in process_ranges]) + + # Flatten the list of lists, sort by start_byte, and write to file + all_chunks = [chunk for process_result in results for chunk in process_result] + all_chunks.sort(key=lambda x: x[0]) + + total_downloaded_bytes = 0 + with open(args.output_file, "wb") as f: + for start_byte, data in all_chunks: + f.write(data) + total_downloaded_bytes += len(data) + + end_time = time.perf_counter() + + print("\n--- Download Complete ---") + print(f"File saved to: {args.output_file}") + print(f"Total downloaded bytes: {total_downloaded_bytes}") + assert total_downloaded_bytes == object_size, "Mismatch in downloaded size" + + duration = end_time - start_time + throughput = total_downloaded_bytes / duration / (1024 * 1024) # MB/s + + print(f"Total time: {duration:.2f} seconds") + print(f"Throughput: {throughput:.2f} MB/s") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/parallel_download_mp_mc.py b/benchmarks/parallel_download_mp_mc.py new file mode 100644 index 000000000..2ea5a56c5 --- /dev/null +++ b/benchmarks/parallel_download_mp_mc.py @@ -0,0 +1,161 @@ + +import argparse +import asyncio +from io import BytesIO +import multiprocessing +import os +import time +import threading +from google.cloud import storage +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + + +async def download_range_async( + client, bucket_name, object_name, start_byte, end_byte, chunk_size +): + """ + Downloads a specific byte range of an object. + This is a modified version of the original download_one_async, adapted to + download a portion of an object. + """ + download_size = end_byte - start_byte + print( + f"Downloading {object_name} from byte {start_byte} to {end_byte} (size {download_size}) in chunks of {chunk_size} from {bucket_name} from process {os.getpid()} and thread {threading.get_ident()}" + ) + + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + + offset = 0 + output_buffer = BytesIO() + + start_time = time.perf_counter() + while offset < download_size: + bytes_to_download = min(chunk_size, download_size - offset) + await mrd.download_ranges( + [(start_byte + offset, bytes_to_download, output_buffer)] + ) + offset += bytes_to_download + end_time = time.perf_counter() + + elapsed_time = end_time - start_time + throughput_mbs = ( + (download_size / elapsed_time) / (1000 * 1000) if elapsed_time > 0 else 0 + ) + + print(f"Time taken for download loop: {elapsed_time:.4f} seconds") + print(f"Throughput for this range: {throughput_mbs:.2f} MB/s") + + assert ( + output_buffer.getbuffer().nbytes == download_size + ), f"downloaded size incorrect for portion of {object_name}" + + await mrd.close() + return output_buffer.getbuffer().nbytes + + +async def run_coroutines(bucket_name, object_name, chunk_size, ranges): + """ + Run coroutines for downloading specified ranges. + """ + client = AsyncGrpcClient().grpc_client + tasks = [] + for start, end in ranges: + task = asyncio.create_task( + download_range_async( + client, bucket_name, object_name, start, end, chunk_size + ) + ) + tasks.append(task) + + results = await asyncio.gather(*tasks) + return sum(results) + + +def worker(bucket_name, object_name, chunk_size, ranges): + """ + A worker process that downloads a set of byte ranges using asyncio. + """ + print(f"Process {os.getpid()} starting with {len(ranges)} ranges.") + downloaded_bytes = asyncio.run(run_coroutines(bucket_name, object_name, chunk_size, ranges)) + return downloaded_bytes + + +def main(): + parser = argparse.ArgumentParser( + description="Download a GCS object in parallel using multiple processes and coroutines." + ) + parser.add_argument("--bucket_name", type=str, default="chandrasiri-rs", help="GCS bucket name.") + parser.add_argument("--object_name", type=str, required=True, help="GCS object name.") + parser.add_argument("--size", type=int, default=1024**3, help="Object size.") + parser.add_argument("-n", "--num_processes", type=int, default=4, help="Number of processes to use.") + parser.add_argument("-m", "--num_coroutines_per_process", type=int, default=2, help="Number of coroutines per process.") + parser.add_argument("--chunk_size", type=int, default=64 * 1024 * 1024, help="Chunk size for each download stream.") + + args = parser.parse_args() + + # Get object size + # storage_client = storage.Client() + # bucket = storage_client.bucket(args.bucket_name) + # blob = bucket.get_blob(args.object_name) + # if not blob: + # print(f"Error: Object '{args.object_name}' not found in bucket '{args.bucket_name}'.") + # return + # object_size = blob.size + object_size = args.size + + total_coroutines = args.num_processes * args.num_coroutines_per_process + + print(f"Starting download of {args.object_name} ({object_size} bytes) from bucket {args.bucket_name}") + print(f"Using {args.num_processes} processes and {args.num_coroutines_per_process} coroutines per process ({total_coroutines} total workers).") + + # Calculate ranges + base_range_size = object_size // total_coroutines + remainder = object_size % total_coroutines + + ranges = [] + current_byte = 0 + for i in range(total_coroutines): + range_size = base_range_size + (1 if i < remainder else 0) + start_byte = current_byte + end_byte = current_byte + range_size + ranges.append((start_byte, end_byte)) + current_byte = end_byte + print('*'*80) + print(ranges) + print('*'*80) + + # Distribute adjacent ranges among processes + ranges_per_process = args.num_coroutines_per_process + process_ranges = [ + ranges[i * ranges_per_process : (i + 1) * ranges_per_process] + for i in range(args.num_processes) + ] + print('*'*80) + print(process_ranges) + print('*'*80) + + start_time = time.perf_counter() + + with multiprocessing.Pool(args.num_processes) as pool: + results = pool.starmap(worker, [(args.bucket_name, args.object_name, args.chunk_size, pr) for pr in process_ranges]) + + total_downloaded_bytes = sum(results) + end_time = time.perf_counter() + + print("\n--- Download Complete ---") + print(f"Total downloaded bytes: {total_downloaded_bytes}") + assert total_downloaded_bytes == object_size, "Mismatch in downloaded size" + + duration = end_time - start_time + throughput = total_downloaded_bytes / duration / (1024 * 1024) # MB/s + + print(f"Total time: {duration:.2f} seconds") + print(f"Throughput: {throughput:.2f} MB/s") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/parallel_downloader.py b/benchmarks/parallel_downloader.py new file mode 100644 index 000000000..7c8696610 --- /dev/null +++ b/benchmarks/parallel_downloader.py @@ -0,0 +1,68 @@ +import asyncio +import multiprocessing +import os +import time +from io import BytesIO +from multiprocessing import Pool + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + + +async def download_object_async(bucket_name, object_name): + """Downloads a single object and saves it to a local file.""" + client = AsyncGrpcClient().grpc_client + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + buffer = BytesIO() + await mrd.download_ranges(read_ranges=[(0, 0, buffer)]) + await mrd.close() + + assert buffer.getbuffer().nbytes == 100 * 1024 * 1024 # 100 MiB + + # Save the downloaded object to a local file + # with open(object_name, "wb") as f: + # f.write(buffer.getvalue()) + + print(f"Finished downloading {object_name}") + + +def download_worker(object_name): + """A synchronous wrapper to be called by multiprocessing.""" + bucket_name = "chandrasiri-rs" + try: + asyncio.run(download_object_async(bucket_name, object_name)) + return f"Successfully downloaded {object_name}" + except Exception as e: + print(f"Failed to download {object_name}: {e}") + raise + + +def main(): + """Main function to orchestrate parallel downloads.""" + num_objects = 3000 + num_processes = 64 + + object_names = [f"para_64-{i}" for i in range(num_objects)] + + # Ensure the directory for downloaded files exists + # This example saves to the current working directory. + # For large numbers of files, consider a dedicated directory. + start_time = time.monotonic_ns() + with Pool(processes=num_processes) as pool: + for result in pool.imap_unordered(download_worker, object_names): + print(result) + end_time = time.monotonic_ns() + + print( + f"\nFinished all download attempts for {num_objects} objects: took - {end_time - start_time / 10**9}s" + ) + + +if __name__ == "__main__": + # Using 'fork' as the start method is recommended for compatibility + # with asyncio in a multiprocessing context. + multiprocessing.set_start_method("fork", force=True) + main() diff --git a/benchmarks/parallel_uploader.py b/benchmarks/parallel_uploader.py new file mode 100644 index 000000000..5801c3667 --- /dev/null +++ b/benchmarks/parallel_uploader.py @@ -0,0 +1,58 @@ +import asyncio +import multiprocessing +import os +from multiprocessing import Pool + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + + +async def upload_object_async(bucket_name, object_name): + """Uploads a single object with 100MiB of random data.""" + num_bytes_to_append = 100 * 1024 * 1024 # 100 MiB + data = os.urandom(num_bytes_to_append) + + client = AsyncGrpcClient().grpc_client + writer = AsyncAppendableObjectWriter( + client=client, bucket_name=bucket_name, object_name=object_name + ) + + await writer.open() + await writer.append(data) + # await writer.flush() + await writer.close() + print(f"Finished uploading {object_name}") + + +def upload_worker(object_name): + """A synchronous wrapper to be called by multiprocessing.""" + bucket_name = "chandrasiri-rs" + try: + asyncio.run(upload_object_async(bucket_name, object_name)) + return f"Successfully uploaded {object_name}" + except Exception as e: + print(f"Failed to upload {object_name}: {e}") + raise + + +def main(): + """Main function to orchestrate parallel uploads.""" + num_objects = 3000 + num_processes = 24 + + object_names = [f"para_64-{i}" for i in range(num_objects)] + + with Pool(processes=num_processes) as pool: + for result in pool.imap_unordered(upload_worker, object_names): + print(result) + + print(f"\nFinished all upload attempts for {num_objects} objects.") + + +if __name__ == "__main__": + # Using 'fork' as the start method is recommended for compatibility + # with asyncio in a multiprocessing context. + multiprocessing.set_start_method("fork", force=True) + main() diff --git a/benchmarks/read_appendable_object.py b/benchmarks/read_appendable_object.py new file mode 100644 index 000000000..a1100b312 --- /dev/null +++ b/benchmarks/read_appendable_object.py @@ -0,0 +1,60 @@ + +import argparse +import asyncio +import sys +import os +import threading +import random +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + +async def get_persisted_size_async(bucket_name, object_name): + """Opens an object and prints its persisted size.""" + # print(f"Getting persisted size for {object_name} in process {os.getpid()} and thread {threading.get_ident()}") + client = AsyncGrpcClient().grpc_client + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + + one_gib = 1024 * 1024 * 1024 + if mrd.persisted_size != one_gib: + await mrd.close() + raise ValueError(f"Object {object_name} has an unexpected size. Expected {one_gib}, but got {mrd.persisted_size}") + # if random.randint(0, 100) % 5 == 0: + print(f"Object: {object_name}, Persisted Size: {mrd.persisted_size}") + with open('b.txt','wb') as fp: + await mrd.download_ranges([(0, mrd.persisted_size, fp)]) + + + await mrd.close() + +def get_persisted_size_sync(bucket_name, object_name): + """Wrapper to run the async get_persisted_size_async in a new event loop.""" + asyncio.run(get_persisted_size_async(bucket_name, object_name)) + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--bucket_name", type=str, default='chandrasiri-rs') + parser.add_argument("--count", type=int, default=100) + parser.add_argument("--start_object_num", type=int, default=0) + parser.add_argument("-n", "--num_workers", type=int, default=2) + parser.add_argument("--executor", type=str, choices=['thread', 'process'], default='process') + args = parser.parse_args() + + ExecutorClass = ThreadPoolExecutor if args.executor == 'thread' else ProcessPoolExecutor + + with ExecutorClass(max_workers=args.num_workers) as executor: + futures = [] + for i in range(args.start_object_num, args.start_object_num + args.count): + object_name = f"py-sdk-mb-mt-{i}" + future = executor.submit(get_persisted_size_sync, args.bucket_name, object_name) + futures.append(future) + + for future in futures: + future.result() + +if __name__ == "__main__": + main() diff --git a/benchmarks/threaded_downloader.py b/benchmarks/threaded_downloader.py new file mode 100644 index 000000000..c275429d3 --- /dev/null +++ b/benchmarks/threaded_downloader.py @@ -0,0 +1,71 @@ +import asyncio +import time +from io import BytesIO +from concurrent.futures import ThreadPoolExecutor + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) + + +async def download_object_async(bucket_name, object_name): + """Downloads a single object.""" + client = AsyncGrpcClient().grpc_client + mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name) + await mrd.open() + buffer = BytesIO() + await mrd.download_ranges(read_ranges=[(0, 0, buffer)]) + await mrd.close() + + assert buffer.getbuffer().nbytes == 100 * 1024 * 1024 # 100 MiB + + # Save the downloaded object to a local file + with open(object_name, "wb") as f: + f.write(buffer.getvalue()) + + print(f"Finished downloading {object_name}") + + +def download_worker(object_name): + """A synchronous wrapper to be called by a thread.""" + bucket_name = "chandrasiri-rs" + try: + # asyncio.run() creates a new event loop for each thread. + asyncio.run(download_object_async(bucket_name, object_name)) + return f"Successfully downloaded {object_name}" + except Exception as e: + # Log the error and return a failure message so other downloads can continue. + error_message = f"Failed to download {object_name}: {e}" + print(error_message) + return error_message + + +def main(): + """Main function to orchestrate parallel downloads using threads.""" + num_objects = 6 + num_threads = 1 + + object_names = [f"para_64-{i}" for i in range(num_objects)] + + print(f"Starting download of {num_objects} objects using {num_threads} threads...") + start_time = time.monotonic() + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + # executor.map runs the worker function for each item in object_names + # and returns results as they are completed. + results = executor.map(download_worker, object_names) + for result in results: + # The result is printed here, but it could also be collected + # for a summary at the end. + pass # The worker already prints success or failure. + + end_time = time.monotonic() + + print( + f"\nFinished all download attempts for {num_objects} objects. Total time: {end_time - start_time:.2f}s" + ) + + +if __name__ == "__main__": + main() diff --git a/benchmarks/upload_zb_n_coros.py b/benchmarks/upload_zb_n_coros.py new file mode 100644 index 000000000..fd05cd04b --- /dev/null +++ b/benchmarks/upload_zb_n_coros.py @@ -0,0 +1,73 @@ +import argparse +import asyncio +import os +import time + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + +async def upload_one(client, bucket_name, object_name, upload_size, chunk_size): + """Uploads a single object of size `upload_size`, in chunks of `chunk_size`""" + print(f"Uploading {object_name} of size {upload_size} in chunks of {chunk_size} to {bucket_name}") + + writer = AsyncAppendableObjectWriter( + client=client, bucket_name=bucket_name, object_name=object_name + ) + + await writer.open() + + start_time = time.perf_counter() + + uploaded_bytes = 0 + while uploaded_bytes < upload_size: + bytes_to_upload = min(chunk_size, upload_size - uploaded_bytes) + data = os.urandom(bytes_to_upload) + await writer.append(data) + uploaded_bytes += bytes_to_upload + + await writer.close() + + end_time = time.perf_counter() + latency = end_time - start_time + throughput = (upload_size / latency) / (1000 * 1000) # MB/s + + print(f"Finished uploading {object_name}") + print(f"Latency: {latency:.2f} seconds") + print(f"Throughput: {throughput:.2f} MB/s") + +async def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--bucket_name", type=str, default='chandrasiri-rs') + parser.add_argument("--upload_size", type=int, default=1024 * 1024 * 1024) # 1 GiB + parser.add_argument("--chunk_size", type=int, default=100 * 1024 * 1024) # 100 MiB + parser.add_argument("-n", "--num_coros", type=int, default=2) + args = parser.parse_args() + + client = AsyncGrpcClient().grpc_client + + total_start_time = time.perf_counter() + + tasks = [] + for i in range(args.num_coros): + object_name = f"py-sdk-mb-mc-{i}" + task = upload_one(client, args.bucket_name, object_name, args.upload_size, args.chunk_size) + tasks.append(task) + + await asyncio.gather(*tasks) + + total_end_time = time.perf_counter() + total_latency = total_end_time - total_start_time + total_uploaded_bytes = args.upload_size * args.num_coros + aggregate_throughput = (total_uploaded_bytes / total_latency) / (1000 * 1000) # MB/s + + print("\n--- Aggregate Results ---") + print(f"Total objects uploaded: {args.num_coros}") + print(f"Total data uploaded: {total_uploaded_bytes / (1024*1024*1024):.2f} GiB") + print(f"Total time taken: {total_latency:.2f} seconds") + print(f"Aggregate throughput: {aggregate_throughput:.2f} MB/s") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/benchmarks/upload_zb_n_coros_semphore.py b/benchmarks/upload_zb_n_coros_semphore.py new file mode 100644 index 000000000..f13e512aa --- /dev/null +++ b/benchmarks/upload_zb_n_coros_semphore.py @@ -0,0 +1,81 @@ +import argparse +import asyncio +import os +import time + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + +async def upload_one(client, bucket_name, object_name, upload_size, chunk_size): + """Uploads a single object of size `upload_size`, in chunks of `chunk_size`""" + print(f"Uploading {object_name} of size {upload_size} in chunks of {chunk_size} to {bucket_name}") + + writer = AsyncAppendableObjectWriter( + client=client, bucket_name=bucket_name, object_name=object_name + ) + + await writer.open() + + start_time = time.perf_counter() + + uploaded_bytes = 0 + while uploaded_bytes < upload_size: + bytes_to_upload = min(chunk_size, upload_size - uploaded_bytes) + data = os.urandom(bytes_to_upload) + await writer.append(data) + uploaded_bytes += bytes_to_upload + + await writer.close() + + end_time = time.perf_counter() + latency = end_time - start_time + throughput = (upload_size / latency) / (1000 * 1000) # MB/s + + print(f"Finished uploading {object_name}") + print(f"Latency: {latency:.2f} seconds") + print(f"Throughput: {throughput:.2f} MB/s") + +async def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--bucket_name", type=str, default='chandrasiri-rs') + parser.add_argument("--upload_size", type=int, default=1024 * 1024 * 1024) # 1 GiB + parser.add_argument("--chunk_size", type=int, default=100 * 1024 * 1024) # 100 MiB + parser.add_argument("--count", type=int, default=100) + parser.add_argument("--start_object_num", type=int, default=0) + parser.add_argument("-n", "--num_coros", type=int, default=8, help="Number of concurrent coroutines.") + args = parser.parse_args() + + client = AsyncGrpcClient().grpc_client + + total_start_time = time.perf_counter() + + semaphore = asyncio.Semaphore(args.num_coros) + tasks = [] + + async def upload_with_semaphore(client, bucket_name, object_name, upload_size, chunk_size): + async with semaphore: + await upload_one(client, bucket_name, object_name, upload_size, chunk_size) + + for i in range(args.start_object_num, args.start_object_num + args.count): + object_name = f"py-sdk-mb-mc-{i}" + task = asyncio.create_task(upload_with_semaphore(client, args.bucket_name, object_name, args.upload_size, args.chunk_size)) + tasks.append(task) + + await asyncio.gather(*tasks) + + total_end_time = time.perf_counter() + total_latency = total_end_time - total_start_time + total_uploaded_bytes = args.upload_size * args.count + aggregate_throughput = (total_uploaded_bytes / total_latency) / (1000 * 1000) # MB/s + + print("\n--- Aggregate Results ---") + print(f"Total objects uploaded: {args.count}") + print(f"Total data uploaded: {total_uploaded_bytes / (1024*1024*1024):.2f} GiB") + print(f"Total time taken: {total_latency:.2f} seconds") + print(f"Aggregate throughput: {aggregate_throughput:.2f} MB/s") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/benchmarks/upload_zb_n_workers.py b/benchmarks/upload_zb_n_workers.py new file mode 100644 index 000000000..771c4cfae --- /dev/null +++ b/benchmarks/upload_zb_n_workers.py @@ -0,0 +1,87 @@ +import argparse +import asyncio +import os +import time +import threading +from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + +async def upload_one_async(bucket_name, object_name, upload_size, chunk_size): + """Uploads a single object of size `upload_size`, in chunks of `chunk_size`""" + print(f"Uploading {object_name} of size {upload_size} in chunks of {chunk_size} to {bucket_name} from process {os.getpid()} and thread {threading.get_ident()}") + client = AsyncGrpcClient().grpc_client + writer = AsyncAppendableObjectWriter( + client=client, bucket_name=bucket_name, object_name=object_name + ) + + await writer.open() + + start_time = time.perf_counter() + + uploaded_bytes = 0 + while uploaded_bytes < upload_size: + bytes_to_upload = min(chunk_size, upload_size - uploaded_bytes) + data = os.urandom(bytes_to_upload) + await writer.append(data) + uploaded_bytes += bytes_to_upload + + await writer.close() + + end_time = time.perf_counter() + latency = end_time - start_time + throughput = (upload_size / latency) / (1000 * 1000) # MB/s + + print(f"Finished uploading {object_name}") + print(f"Latency: {latency:.2f} seconds") + print(f"Throughput: {throughput:.2f} MB/s") + +def upload_one_sync(bucket_name, object_name, upload_size, chunk_size): + """Wrapper to run the async upload_one in a new event loop.""" + asyncio.run(upload_one_async(bucket_name, object_name, upload_size, chunk_size)) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--bucket_name", type=str, default='chandrasiri-rs') + parser.add_argument("--upload_size", type=int, default=1024 * 1024 * 1024) # 1 GiB + parser.add_argument("--chunk_size", type=int, default=100 * 1024 * 1024) # 100 MiB + parser.add_argument("--count", type=int, default=100) + parser.add_argument("--start_object_num", type=int, default=0) + parser.add_argument("-n", "--num_workers", type=int, default=2, help="Number of worker threads or processes.") + parser.add_argument("--executor", type=str, choices=['thread', 'process'], default='process', help="Executor to use: 'thread' for ThreadPoolExecutor, 'process' for ProcessPoolExecutor") + args = parser.parse_args() + + + + total_start_time = time.perf_counter() + + ExecutorClass = ThreadPoolExecutor if args.executor == 'thread' else ProcessPoolExecutor + + with ExecutorClass(max_workers=args.num_workers) as executor: + futures = [] + for i in range(args.start_object_num, args.start_object_num + args.count): + object_name = f"py-sdk-mb-mt-{i}" + future = executor.submit(upload_one_sync, args.bucket_name, object_name, args.upload_size, args.chunk_size) + futures.append(future) + + for future in futures: + future.result() # wait for all workers to complete + + total_end_time = time.perf_counter() + total_latency = total_end_time - total_start_time + total_uploaded_bytes = args.upload_size * args.count + aggregate_throughput = (total_uploaded_bytes / total_latency) / (1000 * 1000) # MB/s + + print("\n--- Aggregate Results ---") + print(f"Total objects uploaded: {args.count}") + print(f"Total data uploaded: {total_uploaded_bytes / (1024*1024*1024):.2f} GiB") + print(f"Total time taken: {total_latency:.2f} seconds") + print(f"Aggregate throughput: {aggregate_throughput:.2f} MB/s") + + +if __name__ == "__main__": + main() diff --git a/benchmarks/upload_zb_one_coro.py b/benchmarks/upload_zb_one_coro.py new file mode 100644 index 000000000..a294e2fe1 --- /dev/null +++ b/benchmarks/upload_zb_one_coro.py @@ -0,0 +1,53 @@ +import argparse +import asyncio +import os +import time + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) + +async def upload_one(client, bucket_name, object_name, upload_size, chunk_size): + """Uploads a single object of size `upload_size`, in chunks of `chunk_size`""" + print(f"Uploading {object_name} of size {upload_size} in chunks of {chunk_size} to {bucket_name}") + + writer = AsyncAppendableObjectWriter( + client=client, bucket_name=bucket_name, object_name=object_name + ) + + await writer.open() + + start_time = time.perf_counter() + + uploaded_bytes = 0 + while uploaded_bytes < upload_size: + bytes_to_upload = min(chunk_size, upload_size - uploaded_bytes) + data = os.urandom(bytes_to_upload) + await writer.append(data) + uploaded_bytes += bytes_to_upload + + await writer.close() + + end_time = time.perf_counter() + latency = end_time - start_time + throughput = (upload_size / latency) / (10**6) # MB/s + + print(f"Finished uploading {object_name}") + print(f"Latency: {latency:.2f} seconds") + print(f"Throughput: {throughput:.2f} MB/s") + +async def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--bucket_name", type=str, default='chandrasiri-rs') + parser.add_argument("--upload_size", type=int, default=1024 * 1024 * 1024) # 100 MiB + parser.add_argument("--chunk_size", type=int, default=100 * 1024 * 1024) # 10 MiB + args = parser.parse_args() + + client = AsyncGrpcClient().grpc_client + object_name = f"py-sdk-mb-1GiB-1" + + await upload_one(client, args.bucket_name, object_name, args.upload_size, args.chunk_size) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tests/perf/microbenchmarks/_utils.py b/tests/perf/microbenchmarks/_utils.py new file mode 100644 index 000000000..219856082 --- /dev/null +++ b/tests/perf/microbenchmarks/_utils.py @@ -0,0 +1,19 @@ +from typing import Any + + +def publish_benchmark_extra_info( + benchmark: Any, params: Any, benchmark_group: str = "read" +) -> None: + """ + Helper function to publish benchmark parameters to the extra_info property. + """ + benchmark.extra_info["num_files"] = params.num_files + benchmark.extra_info["file_size"] = params.file_size_bytes + benchmark.extra_info["chunk_size"] = params.chunk_size_bytes + benchmark.extra_info["pattern"] = params.pattern + benchmark.extra_info["coros"] = params.num_coros + benchmark.extra_info["rounds"] = params.rounds + benchmark.extra_info["bucket_name"] = params.bucket_name + benchmark.extra_info["bucket_type"] = params.bucket_type + benchmark.extra_info["processes"] = params.num_processes + benchmark.group = benchmark_group diff --git a/tests/perf/microbenchmarks/config.py b/tests/perf/microbenchmarks/config.py new file mode 100644 index 000000000..e302d36fc --- /dev/null +++ b/tests/perf/microbenchmarks/config.py @@ -0,0 +1,87 @@ +# nit: TODO: rename it to config_to_params.py +import itertools +import os +from typing import Dict, List + +import yaml + +from tests.perf.microbenchmarks.parameters import ReadParameters + + +def _get_params(bucket_type_filter: str ='zonal') -> Dict[str, List[ReadParameters]]: + """ + Docstring for _get_params + 1. this function output a list of readParameters. + 2. to populate the values of readparameters, use default values from config.yaml + 3. generate all possible params , ie + no. of params should be equal to bucket_type*file_size_mib, chunk_size * process * coros + you may use itertools.product + """ + params: Dict[str, List[ReadParameters]] = {} + config_path = os.path.join(os.path.dirname(__file__), "config.yaml") + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + common_params = config["common"] + bucket_types = common_params["bucket_types"] + file_sizes_mib = common_params["file_sizes_mib"] + chunk_sizes_mib = common_params["chunk_sizes_mib"] + rounds = common_params["rounds"] + + bucket_map = { + "zonal": config["defaults"]["DEFAULT_RAPID_ZONAL_BUCKET"], + "regional": config["defaults"]["DEFAULT_STANDARD_BUCKET"], + } + + for workload in config["workload"]: + workload_name = workload["name"] + params[workload_name] = [] + pattern = workload["pattern"] + processes = workload["processes"] + coros = workload["coros"] + + # Create a product of all parameter combinations + product = itertools.product( + bucket_types, + file_sizes_mib, + chunk_sizes_mib, + processes, + coros, + ) + + for bucket_type, file_size_mib, chunk_size_mib, num_processes, num_coros in product: + if bucket_type_filter != bucket_type: + continue + file_size_bytes = file_size_mib * 1024 * 1024 + chunk_size_bytes = chunk_size_mib * 1024 * 1024 + bucket_name = bucket_map[bucket_type] + + if "single_file" in workload_name: + num_files = 1 + else: + num_files = num_processes * num_coros + + # Create a descriptive name for the parameter set + name = f"{workload_name}_{bucket_type}_{file_size_mib}mib_{chunk_size_mib}mib_{num_processes}p_{num_coros}c" + + params[workload_name].append( + ReadParameters( + name=name, + workload_name=workload_name, + pattern=pattern, + bucket_name=bucket_name, + bucket_type=bucket_type, + num_coros=num_coros, + num_processes=num_processes, + num_files=num_files, + rounds=rounds, + chunk_size_bytes=chunk_size_bytes, + file_size_bytes=file_size_bytes, + ) + ) + return params +if __name__ == "__main__": + import sys + params = _get_params() + print('keys (num_workload in params', len(params), sys.getsizeof(params)) + print(params['read_seq'], len(params['read_seq'])) diff --git a/tests/perf/microbenchmarks/config.yaml b/tests/perf/microbenchmarks/config.yaml new file mode 100644 index 000000000..3fc61ec6d --- /dev/null +++ b/tests/perf/microbenchmarks/config.yaml @@ -0,0 +1,50 @@ +common: + bucket_types: + - "regional" + - "zonal" + file_sizes_mib: + - 10 # 1GB + # chunk_sizes_mib: [1, 2, 8, 64, 100, 128] + chunk_sizes_mib: [64] + rounds: 10 + +workload: + + # one file will be divided equally among coros + - name: "read_seq_single_file" + pattern: "seq" + coros: [1] + processes: [1] + + # one file will be divided equally among processes and coros + - name: "read_seq_multi_process_single_file" + pattern: "seq" + coros: [1,2,4,8] + processes: [2, 4, 8, 16, 32, 64] + + # num_processs * num_coros files will be created. + - name: "read_seq" + pattern: "seq" + coros: [1,2,4,8] + processes: [1] + + # num_processs * num_coros files will be created. + - name: "read_seq_multi_process" + pattern: "seq" + coros: [1,2,4,8] + processes: [2, 4, 8, 16, 32, 64] + + # for random reads always `num_processs * num_coros` will be created. + - name: "read_rand" + pattern: "rand" + coros: [1, 2,4,8] + processes: [1] + + - name: "read_rand_multi_process" + pattern: "rand" + coros: [1,2,4,8] + processes: [2, 4, 8, 16, 32, 64] + +defaults: + DEFAULT_RAPID_ZONAL_BUCKET: "chandrasiri-rs" + DEFAULT_STANDARD_BUCKET: "gcs-pytest-benchmark-standard-bucket" diff --git a/tests/perf/microbenchmarks/conftest.py b/tests/perf/microbenchmarks/conftest.py new file mode 100644 index 000000000..a868a790a --- /dev/null +++ b/tests/perf/microbenchmarks/conftest.py @@ -0,0 +1,113 @@ +# from tests.system.conftest import blobs_to_delete + +# __all__ = [ + +# "blobs_to_delete", +# ] + +import contextlib +from typing import Any +from tests.perf.microbenchmarks.resource_monitor import ResourceMonitor +import pytest +from tests.system._helpers import delete_blob + +import asyncio +import multiprocessing +import os +import uuid +from google.cloud import storage +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient + +_OBJECT_NAME_PREFIX = "micro-benchmark-" + + +@pytest.fixture(scope="function") +def blobs_to_delete(): + blobs_to_delete = [] + + yield blobs_to_delete + + for blob in blobs_to_delete: + delete_blob(blob) + + +@pytest.fixture(scope="session") +def storage_client(): + from google.cloud.storage import Client + + client = Client() + with contextlib.closing(client): + yield client + +@pytest.fixture +def monitor(): + """ + Provides the ResourceMonitor class. + Usage: with monitor() as m: ... + """ + return ResourceMonitor + +def publish_resource_metrics(benchmark: Any, monitor: ResourceMonitor) -> None: + """ + Helper function to publish resource monitor results to the extra_info property. + """ + benchmark.extra_info.update( + { + "cpu_max_global": f"{monitor.max_cpu:.2f}", + "mem_max": f"{monitor.max_mem:.2f}", + "net_throughput_mb_s": f"{monitor.throughput_mb_s:.2f}", + "vcpus": monitor.vcpus, + } + ) + + +async def upload_appendable_object(bucket_name, object_name, object_size, chunk_size): + writer = AsyncAppendableObjectWriter( + AsyncGrpcClient().grpc_client, bucket_name, object_name + ) + await writer.open() + uploaded_bytes = 0 + while uploaded_bytes < object_size: + bytes_to_upload = min(chunk_size, object_size - uploaded_bytes) + await writer.append(os.urandom(bytes_to_upload)) + uploaded_bytes += bytes_to_upload + await writer.close(finalize_on_close=True) + + +def _upload_worker(args): + bucket_name, object_name, object_size, chunk_size = args + asyncio.run( + upload_appendable_object(bucket_name, object_name, object_size, chunk_size) + ) + return object_name + + +def _create_files(num_files, bucket_name, object_size, chunk_size=128 * 1024 * 1024): + """ + 1. using upload_appendable_object implement this and return a list of file names. + """ + object_names = [ + f"{_OBJECT_NAME_PREFIX}{uuid.uuid4().hex[:5]}" for _ in range(num_files) + ] + + results = [] + print(f"uploading {num_files} files") + for i in range(num_files): + args = (bucket_name, object_names[i], object_size, chunk_size) + results.append(_upload_worker(args)) + + return results + + +@pytest.fixture +def workload_params(request): + params = request.param + files_names = _create_files( + params.num_files, + params.bucket_name, + params.file_size_bytes, + ) + return params, files_names diff --git a/tests/perf/microbenchmarks/parameters.py b/tests/perf/microbenchmarks/parameters.py new file mode 100644 index 000000000..afa3307c2 --- /dev/null +++ b/tests/perf/microbenchmarks/parameters.py @@ -0,0 +1,28 @@ +from dataclasses import dataclass + +# import os + +# # Get the benchmark bucket name from the GCS_BENCHMARK_BUCKET environment +# # variable. If the environment variable is not set, a default name is used. +# _DEFAULT_RAPID_ZONAL_BUCKET = "chandrasiri-rs" +# _DEFAULT_STANDARD_BUCKET = "gcs-pytest-benchmark-standard-bucket" + +# # env name should have prefix "GCS_PY_SDK_BENCH_" + default varible name. +# RAPID_ZONAL_BUCKET = os.environ.get("GCS_PY_SDK_BENCH_RAPID_ZONAL_BUCKET", _DEFAULT_RAPID_ZONAL_BUCKET) +# STANDARD_BUCKET = os.environ.get("GCS_PY_SDK_BENCH_STANDARD_BUCKET", _DEFAULT_STANDARD_BUCKET) + + + +@dataclass +class ReadParameters: + name: str + workload_name: str + pattern: str + bucket_name: str + bucket_type: str + num_coros: int + num_processes: int + num_files: int + rounds: int + chunk_size_bytes: int + file_size_bytes: int diff --git a/tests/perf/microbenchmarks/resource_monitor.py b/tests/perf/microbenchmarks/resource_monitor.py new file mode 100644 index 000000000..317a80ebb --- /dev/null +++ b/tests/perf/microbenchmarks/resource_monitor.py @@ -0,0 +1,86 @@ +import threading +import time + +import psutil + + +class ResourceMonitor: + def __init__(self): + self.interval = 1.0 + + self.vcpus = psutil.cpu_count() or 1 + self.max_cpu = 0.0 + self.max_mem = 0.0 + + # Network and Time tracking + self.start_time = 0.0 + self.duration = 0.0 + self.start_net = None + self.net_sent_mb = 0.0 + self.net_recv_mb = 0.0 + + self._stop_event = threading.Event() + self._thread = None + + def __enter__(self): + self.start_net = psutil.net_io_counters() + self.start_time = time.perf_counter() + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + self.duration = time.perf_counter() - self.start_time + end_net = psutil.net_io_counters() + + self.net_sent_mb = (end_net.bytes_sent - self.start_net.bytes_sent) / ( + 1024 * 1024 + ) + self.net_recv_mb = (end_net.bytes_recv - self.start_net.bytes_recv) / ( + 1024 * 1024 + ) + + def _monitor(self): + psutil.cpu_percent(interval=None) + current_process = psutil.Process() + while not self._stop_event.is_set(): + try: + # CPU and Memory tracking for current process tree + total_cpu = current_process.cpu_percent(interval=None) + current_mem = current_process.memory_info().rss + for child in current_process.children(recursive=True): + try: + total_cpu += child.cpu_percent(interval=None) + current_mem += child.memory_info().rss + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + # Normalize CPU by number of vcpus + global_cpu = total_cpu / self.vcpus + + mem = current_mem + + if global_cpu > self.max_cpu: + self.max_cpu = global_cpu + if mem > self.max_mem: + self.max_mem = mem + except psutil.NoSuchProcess: + pass + + time.sleep(self.interval) + + def start(self): + self._thread = threading.Thread(target=self._monitor, daemon=True) + self._thread.start() + + def stop(self): + self._stop_event.set() + if self._thread: + self._thread.join() + + @property + def throughput_mb_s(self): + """Calculates combined network throughput.""" + if self.duration <= 0: + return 0.0 + return (self.net_sent_mb + self.net_recv_mb) / self.duration diff --git a/tests/perf/microbenchmarks/test_reads.py b/tests/perf/microbenchmarks/test_reads.py new file mode 100644 index 000000000..14290a317 --- /dev/null +++ b/tests/perf/microbenchmarks/test_reads.py @@ -0,0 +1,202 @@ +""" +Docstring for tests.perf.microbenchmarks.test_reads + +File for benchmarking zonal reads (i.e. downloads) + +1. 1 object 1 coro with variable chunk_size + +calculate latency, throughput, etc for downloads. + + +""" + +import os +import time +import asyncio +import math +from io import BytesIO + +import pytest +from google.api_core import exceptions +from google.cloud.storage.blob import Blob + +from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient +from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import ( + AsyncMultiRangeDownloader, +) +from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + AsyncAppendableObjectWriter, +) +from tests.perf.microbenchmarks._utils import publish_benchmark_extra_info +from tests.perf.microbenchmarks.conftest import ( + # publish_multi_process_benchmark_extra_info, + publish_resource_metrics, +) + +# from tests.perf.microbenchmarks.config import RAPID_ZONAL_BUCKET, STANDARD_BUCKET +import tests.perf.microbenchmarks.config as config + + +# from functools import partial + +# Pytest-asyncio mode needs to be auto +pytest_plugins = "pytest_asyncio" + +# OBJECT_SIZE = 1024 * (1024**2) # 1 GiB +UPLOAD_CHUNK_SIZE = 128 * 1024 * 1024 +DOWNLOAD_CHUNK_SIZES = [ + # 4 * 1024 * 1024, + # 16 * 1024 * 1024, + # 32 * 1024 * 1024, + 64 + * 1024 + * 1024, +] + +all_zonal_params = config._get_params() +all_regional_params = config._get_params(bucket_type_filter="regional") + + +async def _download_one_async(async_grpc_client, object_name, other_params): + + download_size = other_params.file_size_bytes + chunk_size = other_params.chunk_size_bytes + + # DOWNLOAD + mrd = AsyncMultiRangeDownloader( + async_grpc_client, other_params.bucket_name, object_name + ) + await mrd.open() + + output_buffer = BytesIO() + # download in chunks of `chunk_size` + offset = 0 + output_buffer = BytesIO() + while offset < download_size: + bytes_to_download = min(chunk_size, download_size - offset) + await mrd.download_ranges([(offset, bytes_to_download, output_buffer)]) + offset += bytes_to_download + + assert output_buffer.getbuffer().nbytes == download_size, f"downloaded size incorrect for {object_name}" + print('downloaded bytes', output_buffer.getbuffer().nbytes) + + await mrd.close() + + +async def create_client(): + """Initializes async client and gets the current event loop.""" + return AsyncGrpcClient().grpc_client + + +async def upload_appendable_object( + client, bucket_name, object_name, object_size, chunk_size +): + writer = AsyncAppendableObjectWriter(client, bucket_name, object_name) + await writer.open() + uploaded_bytes = 0 + while uploaded_bytes < object_size: + bytes_to_upload = min(chunk_size, object_size - uploaded_bytes) + await writer.append(os.urandom(bytes_to_upload)) + uploaded_bytes += bytes_to_upload + await writer.close(finalize_on_close=False) + # print('uploading took', time.) + + +def my_setup( + loop, client, bucket_name: str, object_name: str, upload_size: int, chunk_size: int +): + loop.run_until_complete( + upload_appendable_object( + client, bucket_name, object_name, upload_size, chunk_size + ) + ) + + +def download_one_object_wrapper(loop, client, filename, other_params): + loop.run_until_complete(_download_one_async(client, filename, other_params)) + + +@pytest.mark.parametrize( + "workload_params", + all_zonal_params["read_seq_single_file"], + indirect=True, + ids=lambda p: p.name, +) +# params - num_rounds or env var ? +def test_downloads_one_object( + benchmark, storage_client, blobs_to_delete, monitor, workload_params +): + """ + this test should use pytest-benchmark, + + target_function should be `download_object` + setup function should be `my_setup` + no teardown function for now. + + """ + params, files_names = workload_params + publish_benchmark_extra_info(benchmark, params) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + client = loop.run_until_complete(create_client()) + + try: + with monitor() as m: + benchmark.pedantic( + target=download_one_object_wrapper, + iterations=1, + rounds=params.rounds, + args=(loop, client, files_names[0], params), + ) + finally: + tasks = asyncio.all_tasks(loop=loop) + for task in tasks: + task.cancel() + loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) + loop.close() + object_size = params.file_size_bytes + + min_throughput = (object_size / (1024 * 1024)) / benchmark.stats["max"] + max_throughput = (object_size / (1024 * 1024)) / benchmark.stats["min"] + mean_throughput = (object_size / (1024 * 1024)) / benchmark.stats["mean"] + median_throughput = (object_size / (1024 * 1024)) / benchmark.stats["median"] + + benchmark.extra_info["throughput_MiB_s_min"] = min_throughput + benchmark.extra_info["throughput_MiB_s_max"] = max_throughput + benchmark.extra_info["throughput_MiB_s_mean"] = mean_throughput + benchmark.extra_info["throughput_MiB_s_median"] = median_throughput + + print(f"\nThroughput Statistics (MiB/s):") + print(f" Min: {min_throughput:.2f} (from max time)") + print(f" Max: {max_throughput:.2f} (from min time)") + print(f" Mean: {mean_throughput:.2f} (approx, from mean time)") + print(f" Median: {median_throughput:.2f} (approx, from median time)") + + # Get benchmark name, rounds, and iterations + name = benchmark.name + rounds = benchmark.stats['rounds'] + iterations = benchmark.stats['iterations'] + + # Header for throughput table + header = "\n\n" + "-" * 125 + "\n" + header += "Throughput Benchmark (MiB/s)\n" + header += "-" * 125 + "\n" + header += f"{'Name':<50} {'Min':>10} {'Max':>10} {'Mean':>10} {'StdDev':>10} {'Median':>10} {'Rounds':>8} {'Iterations':>12}\n" + header += "-" * 125 + + # Data row for throughput table + # The table headers (Min, Max) refer to the throughput values. + row = f"{name:<50} {min_throughput:>10.4f} {max_throughput:>10.4f} {mean_throughput:>10.4f} {'N/A':>10} {median_throughput:>10.4f} {rounds:>8} {iterations:>12}" + + print(header) + print(row) + print("-" * 125) + + publish_resource_metrics(benchmark, m) + print("this is bucket -name ", params.bucket_name) + print("this is filenames", files_names) + + blobs_to_delete.extend( + storage_client.bucket(params.bucket_name).blob(f) for f in files_names + )