I'm in the midst of rewriting a big app that currently uses AWS S3 and will soon be switched over to Google Cloud Storage. This blog post is a rough attempt to log various activities in both Python libraries:

$ pip install boto3
$ emacs ~/.aws/credentials


$ pip install google-cloud-storage
$ cat ./google_service_account.json

Note: You need to create a service account and then that gives you a .json file which you download and make sure you pass its path when you create a client.

I suspect there are more/other ways to do this with environment variables alone but I haven't got there yet.

Making a "client"


Note, there are easier shortcuts for this but with this pattern you can have full control over things like like read_timeout, connect_timeout, etc. with that confi_params keyword.

import boto3
from botocore.config import Config

def get_s3_client(region_name=None, **config_params):
    options = {"config": Config(**config_params)}
    if region_name:
        options["region_name"] = region_name
    session = boto3.session.Session()
    return session.client("s3", **options)


from google.cloud import storage

def get_gcs_client():
    return storage.Client.from_service_account_json(

Checking if a bucket exists and if you have access to it

boto3 (for s3_client here, see above)

from botocore.exceptions import ClientError, EndpointConnectionError


except ClientError as exception:
    if exception.response["Error"]["Code"] in ("403", "404"):
        raise BucketHardError(
            f"Unable to connect to bucket={bucket_name!r} "
            f"ClientError ({exception.response!r})"
except EndpointConnectionError:
    raise BucketSoftError(
        f"Unable to connect to bucket={bucket.name!r} "
    print("It exists and we have access to it.")


from google.api_core.exceptions import BadRequest

except BadRequest as exception:
    raise BucketHardError(
        f"Unable to connect to bucket={bucket_name!r}, "
        f"because bucket not found due to {exception}"
    print("It exists and we have access to it.")

Checking if an object exists


from botocore.exceptions import ClientError

def key_existing(client, bucket_name, key):
    """return a tuple of (
        key's size if it exists or 0,
        S3 key metadata
    If the object doesn't exist, return None for the metadata.
        response = client.head_object(Bucket=bucket_name, Key=key)
        return response["ContentLength"], response.get("Metadata")
    except ClientError as exception:
        if exception.response["Error"]["Code"] == "404":
            return 0, None

Note, if you do this a lot and often find that the object doesn't exist the using list_objects_v2 is probably faster.


def key_existing(client, bucket_name, key):
    """return a tuple of (
        key's size if it exists or 0,
        S3 key metadata
    If the object doesn't exist, return None for the metadata.
    bucket = client.get_bucket(bucket_name)
    blob = bucket.get_blob(key)
    if blob:
        return blob.size, blob.metadata
    return 0, None

Uploading a file with a special Content-Encoding

Note: You have to use your imagination with regards to the source. In this example, I'm assuming that the source is a file on disk and that it might have already been compressed with gzip.


def upload(file_path, bucket_name, key_name, metadata=None, compressed=False):
    content_type = get_key_content_type(key_name)

    metadata = metadata or {}

    # boto3 will raise a botocore.exceptions.ParamValidationError
    # error if you try to do something like:
    #  s3.put_object(Bucket=..., Key=..., Body=..., ContentEncoding=None)
    # ...because apparently 'NoneType' is not a valid type.
    # We /could/ set it to something like '' but that feels like an
    # actual value/opinion. Better just avoid if it's not something
    # really real.
    extras = {}
    if content_type:
        extras["ContentType"] = content_type
    if compressed:
        extras["ContentEncoding"] = "gzip"
    if metadata:
        extras["Metadata"] = metadata

     with open(file_path, "rb") as f:
         s3_client.put_object(Bucket=bucket_name, Key=key_name, Body=f, **extras)


def upload(file_path, bucket_name, key_name, metadata=None, compressed=False):
    content_type = get_key_content_type(key_name)

    metadata = metadata or {}
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.blob(key_name)

    if content_type:
        blob.content_type = content_type
    if compressed:
        blob.content_encoding = "gzip"
    blob.metadata = metadata

Downloading and uncompressing a gzipped object


from io import BytesIO
from gzip import GzipFile
from botocore.exceptions import ClientError

from .utils import iter_lines

def get_stream(bucket_name, key_name):
        response = source.s3_client.get_object(
            Bucket=bucket_name, Key=key
    except ClientError as exception:
        if exception.response["Error"]["Code"] == "NoSuchKey":
            raise KeyHardError("key not in bucket")

    stream = response["Body"]
    # But if the content encoding is gzip we have re-wrap the stream.
    if response.get("ContentEncoding") == "gzip":
        body = response["Body"].read()
        bytestream = BytesIO(body)
        stream = GzipFile(None, "rb", fileobj=bytestream)

    for line in iter_lines(stream):
        yield line.decode("utf-8")


from io import BytesIO
from gzip import GzipFile
from botocore.exceptions import ClientError

from .utils import iter_lines

def get_stream(bucket_name, key_name):
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.get_blob(key)
    if blob is None:
        raise KeyHardError("key not in bucket")

    bytestream = BytesIO()

    for line in iter_lines(bytestream):
        yield line.decode("utf-8")

Note! That here blob.download_to_file works a bit like requests.get() in that it automatically notices the Content-Encoding metadata and does the gunzip on the fly.


It's not fair to compare them on style because I think boto3 came out of boto which probably started back in the day when Google was just web search and web emails.

