Filtered by Python

Page 6

Reset

Switching from AWS S3 (boto3) to Google Cloud Storage (google-cloud-storage) in Python

October 12, 2018
1 comment Python

I'm in the midst of rewriting a big app that currently uses AWS S3 and will soon be switched over to Google Cloud Storage. This blog post is a rough attempt to log various activities in both Python libraries:

Disclaimer: I'm manually copying these snippets from a real project and I have to manually scrub the code clean of unimportant quirks, hacks, and other unrelated things that would just add noise.

Install

boto3


$ pip install boto3
$ emacs ~/.aws/credentials

google-cloud-storage


$ pip install google-cloud-storage
$ cat ./google_service_account.json

Note: You need to create a service account and then that gives you a .json file which you download and make sure you pass its path when you create a client.

I suspect there are more/other ways to do this with environment variables alone but I haven't got there yet.

Making a "client"

boto3

Note, there are easier shortcuts for this but with this pattern you can have full control over things like like read_timeout, connect_timeout, etc. with that confi_params keyword.


import boto3
from botocore.config import Config


def get_s3_client(region_name=None, **config_params):
    options = {"config": Config(**config_params)}
    if region_name:
        options["region_name"] = region_name
    session = boto3.session.Session()
    return session.client("s3", **options)

google-cloud-storage


from google.cloud import storage


def get_gcs_client():
    return storage.Client.from_service_account_json(
        settings.GOOGLE_APPLICATION_CREDENTIALS_PATH
    )

Checking if a bucket exists and if you have access to it

boto3 (for s3_client here, see above)


from botocore.exceptions import ClientError, EndpointConnectionError


try:

    s3_client.head_bucket(Bucket=bucket_name)
except ClientError as exception:
    if exception.response["Error"]["Code"] in ("403", "404"):
        raise BucketHardError(
            f"Unable to connect to bucket={bucket_name!r} "
            f"ClientError ({exception.response!r})"
        )
    else:
        raise
except EndpointConnectionError:
    raise BucketSoftError(
        f"Unable to connect to bucket={bucket.name!r} "
        f"EndpointConnectionError"
    )
else:
    print("It exists and we have access to it.")

google-cloud-storage


from google.api_core.exceptions import BadRequest


try:
    gcs_client.get_bucket(bucket_name)
except BadRequest as exception:
    raise BucketHardError(
        f"Unable to connect to bucket={bucket_name!r}, "
        f"because bucket not found due to {exception}"
    )
else:
    print("It exists and we have access to it.")

Checking if an object exists

boto3


from botocore.exceptions import ClientError


def key_existing(client, bucket_name, key):
    """return a tuple of (
        key's size if it exists or 0,
        S3 key metadata
    )
    If the object doesn't exist, return None for the metadata.
    """
    try:
        response = client.head_object(Bucket=bucket_name, Key=key)
        return response["ContentLength"], response.get("Metadata")
    except ClientError as exception:
        if exception.response["Error"]["Code"] == "404":
            return 0, None
        raise

Note, if you do this a lot and often find that the object doesn't exist the using list_objects_v2 is probably faster.

google-cloud-storage


def key_existing(client, bucket_name, key):
    """return a tuple of (
        key's size if it exists or 0,
        S3 key metadata
    )
    If the object doesn't exist, return None for the metadata.
    """
    bucket = client.get_bucket(bucket_name)
    blob = bucket.get_blob(key)
    if blob:
        return blob.size, blob.metadata
    return 0, None

Uploading a file with a special Content-Encoding

Note: You have to use your imagination with regards to the source. In this example, I'm assuming that the source is a file on disk and that it might have already been compressed with gzip.

boto3


def upload(file_path, bucket_name, key_name, metadata=None, compressed=False):
    content_type = get_key_content_type(key_name)

    metadata = metadata or {}

    # boto3 will raise a botocore.exceptions.ParamValidationError
    # error if you try to do something like:
    #
    #  s3.put_object(Bucket=..., Key=..., Body=..., ContentEncoding=None)
    #
    # ...because apparently 'NoneType' is not a valid type.
    # We /could/ set it to something like '' but that feels like an
    # actual value/opinion. Better just avoid if it's not something
    # really real.
    extras = {}
    if content_type:
        extras["ContentType"] = content_type
    if compressed:
        extras["ContentEncoding"] = "gzip"
    if metadata:
        extras["Metadata"] = metadata

     with open(file_path, "rb") as f:
         s3_client.put_object(Bucket=bucket_name, Key=key_name, Body=f, **extras)

google-cloud-storage


def upload(file_path, bucket_name, key_name, metadata=None, compressed=False):
    content_type = get_key_content_type(key_name)

    metadata = metadata or {}
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.blob(key_name)

    if content_type:
        blob.content_type = content_type
    if compressed:
        blob.content_encoding = "gzip"
    blob.metadata = metadata
    blob.upload_from_file(f)

Downloading and uncompressing a gzipped object

boto3


from io import BytesIO
from gzip import GzipFile
from botocore.exceptions import ClientError

from .utils import iter_lines


def get_stream(bucket_name, key_name):
    try:
        response = source.s3_client.get_object(
            Bucket=bucket_name, Key=key
        )
    except ClientError as exception:
        if exception.response["Error"]["Code"] == "NoSuchKey":
            raise KeyHardError("key not in bucket")
        raise

    stream = response["Body"]
    # But if the content encoding is gzip we have re-wrap the stream.
    if response.get("ContentEncoding") == "gzip":
        body = response["Body"].read()
        bytestream = BytesIO(body)
        stream = GzipFile(None, "rb", fileobj=bytestream)

    for line in iter_lines(stream):
        yield line.decode("utf-8")

google-cloud-storage


from io import BytesIO
from gzip import GzipFile
from botocore.exceptions import ClientError

from .utils import iter_lines


def get_stream(bucket_name, key_name):
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.get_blob(key)
    if blob is None:
        raise KeyHardError("key not in bucket")

    bytestream = BytesIO()
    blob.download_to_file(bytestream)
    bytestream.seek(0)

    for line in iter_lines(bytestream):
        yield line.decode("utf-8")

Note! That here blob.download_to_file works a bit like requests.get() in that it automatically notices the Content-Encoding metadata and does the gunzip on the fly.

Conclusion

It's not fair to compare them on style because I think boto3 came out of boto which probably started back in the day when Google was just web search and web emails.

I wanted to include a section about how to unit test against these, especially how to mock them. But what I had for a draft was getting ugly. Yes, it works for the testing needs I have in my app but it's very personal taste (aka. appropriate for the context) and admittedly quite messy.

Fancy linkifying of text with Bleach and domain checks (with Python)

October 10, 2018
2 comments Python, Web development

Bleach is awesome. Thank you for it @willkg! It's a Python library for sanitizing text as well as "linkifying" text for HTML use. For example, consider this:

>>> import bleach
>>> bleach.linkify("Here is some text with a url.com.")
'Here is some text with a <a href="http://url.com" rel="nofollow">url.com</a>.'

Note that sanitizing is separate thing, but if you're curious, consider this example:

>>> bleach.linkify(bleach.clean("Here is <script> some text with a url.com."))
'Here is &lt;script&gt; some text with a <a href="http://url.com" rel="nofollow">url.com</a>.'

With that output you can confidently template interpolate that string straight into your HTML.

Getting fancy

That's a great start but I wanted a more. For one, I don't always want the rel="nofollow" attribute on all links. In particular for links that are within the site. Secondly, a lot of things look like a domain but isn't. For example This is a text.at the start which would naively become...:

>>> bleach.linkify("This is a text.at the start")
'This is a <a href="http://text.at" rel="nofollow">text.at</a> the start'

...because text.at looks like a domain.

So here is how I use it here on www.peterbe.com to linkify blog comments:


def custom_nofollow_maker(attrs, new=False):
    href_key = (None, u"href")

    if href_key not in attrs:
        return attrs

    if attrs[href_key].startswith(u"mailto:"):
        return attrs

    p = urlparse(attrs[href_key])
    if p.netloc not in settings.NOFOLLOW_EXCEPTIONS:
        # Before we add the `rel="nofollow"` let's first check that this is a
        # valid domain at all.
        root_url = p.scheme + "://" + p.netloc
        try:
            response = requests.head(root_url)
            if response.status_code == 301:
                redirect_p = urlparse(response.headers["location"])
                # If the only difference is that it redirects to https instead
                # of http, then amend the href.
                if (
                    redirect_p.scheme == "https"
                    and p.scheme == "http"
                    and p.netloc == redirect_p.netloc
                ):
                    attrs[href_key] = attrs[href_key].replace("http://", "https://")

        except ConnectionError:
            return None

        rel_key = (None, u"rel")
        rel_values = [val for val in attrs.get(rel_key, "").split(" ") if val]
        if "nofollow" not in [rel_val.lower() for rel_val in rel_values]:
            rel_values.append("nofollow")
        attrs[rel_key] = " ".join(rel_values)

    return attrs

html = bleach.linkify(text, callbacks=[custom_nofollow_maker])

This basically taking the default nofollow callback and extending it a bit.

By the way, here is the complete code I use for sanitizing and linkifying blog comments here on this site: render_comment_text.

Caveats

This is slow because it requires network IO every time a piece of text needs to be linkified (if it has domain looking things in it) but that's best alleviated by only doing it once and either caching it or persistently storing the cleaned and rendered output.

Also, the check uses try: requests.head() except requests.exceptions.ConnectionError: as the method to see if the domain works. I considered doing a whois lookup or something but that felt a little wrong because just because a domain exists doesn't mean there's a website there. Either way, it could be that the domain/URL is perfectly fine but in that very unlucky instant you checked your own server's internet or some other DNS lookup thing is busted. Perhaps wrapping it in a retry and doing try: requests.head() except requests.exceptions.RetryError: instead.

Lastly, the business logic I chose was to rewrite all http:// to https:// only if the URL http://domain does a 301 redirect to https://domain. So if the original link was http://bit.ly/redirect-slug it leaves it as is. Perhaps a fancier version would be to look at the domain name ending. For example HEAD http://google.com 301 redirects to https://www.google.com so you could use the fact that "www.google.com".endswith("google.com").

UPDATE Oct 10 2018

Moments after publishing this, I discovered a bug where it would fail badly if the text contained a URL with an ampersand in it. Turns out, it was a known bug in Bleach. It only happens when you try to pass a filter to the bleach.Cleaner() class.

So I simplified my code and now things work. Apparently, using bleach.Cleaner(filters=[...]) is faster so I'm losing that. But, for now, that's OK in my context.

Also, in another later fix, I improved the function some more by avoiding non-HTTP links (with the exception of mailto: and tel:). Otherwise it would attempt to run requests.head('ssh://server.example.com') which doesn't make sense.

The ideal number of workers in Jest

October 8, 2018
0 comments Python, React

tl;dr; Use --runInBand when running jest in CI and use --maxWorkers=3 on your laptop.

Running out of memory on CircleCI

We have a test suite that covers 236 tests across 68 suites and runs mainly a bunch of enzyme rendering of React component but also some plain old JavaScript function tests. We hit a problem where tests utterly failed in CircleCI due to running out of memory. Several individual tests, before it gave up or failed, reported to take up to 45 seconds.
Turns out, jest tried to use 36 workers because the Docker OS it was running was reporting 36 CPUs.


> circleci@9e4c489cf76b:~/repo$ node
> var os = require('os')
undefined
> os.cpus().length
36

After forcibly setting --maxWorkers=2 to the jest command, the tests passed and it took 20 seconds. Yay!

But that got me thinking, what is the ideal number of workers when I'm running the suite here on my laptop? To find out, I wrote a Python script that would wrap the call CI=true yarn run test --maxWorkers=%(WORKERS) repeatedly and report which number is ideal for my laptop.

After leaving it running for a while it spits out this result:

SORTED BY BEST TIME:
3 8.47s
4 8.59s
6 9.12s
5 9.18s
2 9.51s
7 10.14s
8 10.59s
1 13.80s

The conclusion is vague. There is some benefit to using some small number greater than 1. If you attempt a bigger number it might backfire and take longer than necessary and if you do do that your laptop is likely to crawl and cough.

Notes and conclusions

django-pipeline and Zopfli

August 15, 2018
0 comments Python, Web development, Django

tl;dr; I wrote my own extension to django-pipeline that uses Zopfli to create .gz files from static assets collected in Django. Here's the code.

Nginx and Gzip

What I wanted was to continue to use django-pipeline which does a great job of reading a settings.BUNDLES setting and generating things like /static/js/myapp.min.a206ec6bd8c7.js. It has configurable options to not just make those files but also generate /static/js/myapp.min.a206ec6bd8c7.js.gz which means that with gzip_static in Nginx, Nginx doesn't have to Gzip compress static files on-the-fly but can basically just read it from disk. Nginx doesn't care how the file got there but an immediate advantage of preparing the file on disk is that the compression can be higher (smaller .gz files). That means smaller responses to be sent to the client and less CPU work needed from Nginx. Your job is to set gzip_static on; in your Nginx config (per location) and make sure every compressable file exists on disk with the same name but with the .gz suffix.

In other words, when the client does GET https://example.com/static/foo.js Nginx quickly does a read on the file system to see if there exists a ROOT/static/foo.js.gz and if so, return that. If the files doesn't exist, and you have gzip on; in your config, Nginx will read the ROOT/static/foo.js into memory, compress it (usually with a lower compression level) and return that. Nginx takes care of figuring out whether to do this, at all, dynamically by reading the Accept-Encoding header from the request.

Zopfli

The best solution today to generate these .gz files is Zopfli. Zopfli is slower than good old regular gzip but the files get smaller. To manually compress a file you can install the zopfli executable (e.g. brew install zopfli or apt install zopfli) and then run zopfli $ROOT/static/foo.js which creates a $ROOT/static/foo.js.gz file.

So your task is to build some pipelining code that generates .gz version of every static file your Django server creates.
At first I tried django-static-compress which has an extension to regular Django staticfiles storage. The default staticfiles storage is django.contrib.staticfiles.storage.StaticFilesStorage and that's what django-static-compress extends.

But I wanted more. I wanted all the good bits from django-pipeline (minification, hashes in filenames, concatenation, etc.) Also, in django-static-compress you can't control the parameters to zopfli such as the number of iterations. And with django-static-compress you have to install Brotli which I can't use because I don't want to compile my own Nginx.

Solution

So I wrote my own little mashup. I took some ideas from how django-pipeline does regular gzip compression as a post-process step. And in my case, I never want to bother with any of the other files that are put into the settings.STATIC_ROOT directory from the collectstatic command.

Here's my implementation: peterbecom.storage.ZopfliPipelineCachedStorage. Check it out. It's very tailored to my personal preferences and usecase but it works great. To use it, I have this in my settings.py: STATICFILES_STORAGE = "peterbecom.storage.ZopfliPipelineCachedStorage"

I know what you're thinking

Why not try to get this into django-pipeline or into django-compress-static. The answer is frankly laziness. Hopefully someone else can pick up this task. I have fewer and fewer projects where I use Django to handle static files. These days most of my projects are single-page-apps that are 100% static and using Django for XHR requests to get the data.

Django lock decorator with django-redis

August 14, 2018
4 comments Python, Web development, Django, Redis

Here's the code. It's quick-n-dirty but it works wonderfully:


import functools
import hashlib

from django.core.cache import cache
from django.utils.encoding import force_bytes


def lock_decorator(key_maker=None):
    """
    When you want to lock a function from more than 1 call at a time.
    """

    def decorator(func):
        @functools.wraps(func)
        def inner(*args, **kwargs):
            if key_maker:
                key = key_maker(*args, **kwargs)
            else:
                key = str(args) + str(kwargs)
            lock_key = hashlib.md5(force_bytes(key)).hexdigest()
            with cache.lock(lock_key):
                return func(*args, **kwargs)

        return inner

    return decorator

How To Use It

This has saved my bacon more than once. I use it on functions that really need to be made synchronous. For example, suppose you have a function like this:


def fetch_remote_thing(name):
    try:
        return Thing.objects.get(name=name).result
    except Thing.DoesNotExist:
        # Need to go out and fetch this
        result = some_internet_fetching(name)  # Assume this is sloooow
        Thing.objects.create(name=name, result=result)
        return result

That function is quite dangerous because if executed by two concurrent web requests for example, they will trigger
two "identical" calls to some_internet_fetching and if the database didn't have the name already, it will most likely trigger two calls to Thing.objects.create(name=name, ...) which could lead to integrity errors or if it doesn't the whole function breaks down because it assumes that there is only 1 or 0 of these Thing records.

Easy to solve, just add the lock_decorator:


@lock_decorator()
def fetch_remote_thing(name):
    try:
        return Thing.objects.get(name=name).result
    except Thing.DoesNotExist:
        # Need to go out and fetch this
        result = some_internet_fetching(name)  # Assume this is sloooow
        Thing.objects.create(name=name, result=result)
        return result

Now, thanks to Redis distributed locks, the function is always allowed to finish before it starts another one. All the hairy locking (in particular, the waiting) is implemented deep down in Redis which is rock solid.

Bonus Usage

Another use that has also saved my bacon is functions that aren't necessarily called with the same input argument but each call is so resource intensive that you want to make sure it only does one of these at a time. Suppose you have a Django view function that does some resource intensive work and you want to stagger the calls so that it only runs it one at a time. Like this for example:


def api_stats_calculations(request, part):
    if part == 'users-per-month':
        data = _calculate_users_per_month()  # expensive
    elif part == 'pageviews-per-week':
        data = _calculate_pageviews_per_week()  # intensive
    elif part == 'downloads-per-day':
        data = _calculate_download_per_day()  # slow
    elif you == 'get' and the == 'idea':
        ...

    return http.JsonResponse({'data': data})

If you just put @lock_decorator() on this Django view function, and you have some (almost) concurrent calls to this function, for example from a uWSGI server running with threads and multiple processes, then it will not synchronize the calls.

The solution to this is to write your own function for generating the lock key, like this for example:


@lock_decorator(
    key_maker=lamnbda request, part: 'api_stats_calculations'
)
def api_stats_calculations(request, part):
    if part == 'users-per-month':
        data = _calculate_users_per_month()  # expensive
    elif part == 'pageviews-per-week':
        data = _calculate_pageviews_per_week()  # intensive
    elif part == 'downloads-per-day':
        data = _calculate_download_per_day()  # slow
    elif you == 'get' and the == 'idea':
        ...

    return http.JsonResponse({'data': data})

Now it works.

How Time-Expensive Is It?

Perhaps you worry that 99% of your calls to the function don't have the problem of calling the function concurrently. How much is this overhead of this lock costing you? I wondered that too and set up a simple stress test where I wrote a really simple Django view function. It looked something like this:


@lock_decorator(key_maker=lambda request: 'samekey')
def sample_view_function(request):
    return http.HttpResponse('Ok\n')

I started a Django server with uWSGI with multiple processors and threads enabled. Then I bombarded this function with a simple concurrent stress test and observed the requests per minute. The cost was extremely tiny and almost negligable (compared to not using the lock decorator). Granted, in this test I used Redis on redis://localhost:6379/0 but generally the conclusion was that the call is extremely fast and not something to worry too much about. But your mileage may vary so do your own experiments for your context.

What's Needed

You need to use django-redis as your Django cache backend. I've blogged before about using django-redis, for example Fastest cache backend possible for Django and Fastest Redis configuration for Django.

django-html-validator now supports Django 2.x

August 13, 2018
0 comments Python, Web development, Django

django-html-validator is a Django project that can validate your generated HTML. It does so by sending the HTML to https://html5.validator.nu/ or you can start your own Java server locally with vnu.jar from here. The output is that you can have validation errors printed to stdout or you can have them put as .txt files in a temporary directory. You can also include it in your test suite and make it so that tests fail if invalid HTML is generated during rendering in Django unit tests.

The project seems to have become a lot more popular than I thought it would. It started as a one-evening-hack and because there was interest I wrapped it up in a proper project with "docs" and set up CI for future contributions.

I kinda of forgot the project since almost all my current projects generate JSON on the server and generates the DOM on-the-fly with client-side JavaScript but apparently a lot of issues and PRs were filed related to making it work in Django 2.x. So I took the time last night to tidy up the tox.ini etc. and the necessary compatibility fixes to make it work with but Django 1.8 up to Django 2.1. Pull request here.

Thank you all who contributed! I'll try to make a better job noticing filed issues in the future.

Quick dog-piling (aka stampeding herd) URL stresstest

August 10, 2018
0 comments Python

Whenever you want to quickly bombard a URL with some concurrent traffic, you can use this:


import random
import time
import requests
import concurrent.futures


def _get_size(url):
    sleep = random.random() / 10
    # print("sleep", sleep)
    time.sleep(sleep)
    r = requests.get(url)
    # print(r.status_code)
    assert len(r.text)
    return len(r.text)


def run(url, times=10):
    sizes = []
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for _ in range(times):
            futures.append(executor.submit(_get_size, url))
        for future in concurrent.futures.as_completed(futures):
            sizes.append(future.result())
    return sizes


if __name__ == "__main__":
    import sys

    print(run(sys.argv[1]))

It's really basic but it works wonderfully. It starts 10 concurrent threads that all hit the same URL at almost the same time.
I've been using this stress test a local Django server to test some atomicity writes with the file system.

A good Django view function cache decorator for http.JsonResponse

June 20, 2018
0 comments Python, Web development, Django

I use this a lot. It has served me very well. The code:


import hashlib
import functools

import markus  # optional
from django.core.cache import cache
from django import http
from django.utils.encoding import force_bytes, iri_to_uri

metrics = markus.get_metrics(__name__)  # optional


def json_response_cache_page_decorator(seconds):
    """Cache only when there's a healthy http.JsonResponse response."""

    def decorator(func):

        @functools.wraps(func)
        def inner(request, *args, **kwargs):
            cache_key = 'json_response_cache:{}:{}'.format(
                func.__name__,
                hashlib.md5(force_bytes(iri_to_uri(
                    request.build_absolute_uri()
                ))).hexdigest()
            )
            content = cache.get(cache_key)
            if content is not None:

                # metrics is optional
                metrics.incr(
                    'json_response_cache_hit',
                    tags=['view:{}'.format(func.__name__)]
                )

                return http.HttpResponse(
                    content,
                    content_type='application/json'
                )
            response = func(request, *args, **kwargs)
            if (
                isinstance(response, http.JsonResponse) and
                response.status_code in (200, 304)
            ):
                cache.set(cache_key, response.content, seconds)
            return response

        return inner

    return decorator

To use it simply add to Django view functions that might return a http.JsonResponse. For example, something like this:


@json_response_cache_page_decorator(60)
def search(request):
    q = request.GET.get('q')
    if not q:
        return http.HttpResponseBadRequest('no q')
    results = search_database(q)
    return http.JsonResponse({
        'results': results,
    })

The reasons I use this instead of django.views.decorators.cache.cache_page() is because of a couple of reasons.

  • cache_page generates cache keys that don't contain the view function name.
  • cache_page tries to cache the whole http.HttpResponse instance which can't be serialized if you use the msgpack serializer.
  • cache_page also sends Cache-Control headers which is not always what you want.
  • Not possible to inject your own custom code such as my usage of metrics.

Disclaimer: This snippet of code comes from a side-project that has a very specific set of requirements. They're rather unique to that project and I have a full picture of the needs. E.g. I know what specific headers matter and don't matter. Your project might be different. For example, perhaps you don't have markus to handle your metrics. Or perhaps you need to re-write the query string for something to normalize the cache key differently. Point being, take the snippet of code as inspiration when you too find that django.views.decorators.cache.cache_page() isn't good enough for your Django view functions.

GeneratorExit - How to clean up after the last yield in Python

June 7, 2018
7 comments Python

tl;dr; Use except GeneratorExit if your Python generator needs to know the consumer broke out.

Suppose you have a generator that yields things out. After each yield you want to execute some code that does something like logging or cleaning up. Here one such trivialized example:

The Problem


def pump():
    numbers = [1, 2, 3, 4]
    for number in numbers:
        yield number
        print("Have sent", number)
    print("Last number was sent")


for number in pump():
    print("Got", number)

print("All done")

The output is, as expected:

Got 1
Have sent 1
Got 2
Have sent 2
Got 3
Have sent 3
Got 4
Have sent 4
Last number was sent
All done

In this scenario, the consumer of the generator (the for number in pump() loop in this example) gets every thing the generator generates so after the last yield the generator is free to do any last minute activities which might be important such as closing a socket or updating a database.

Suppose the consumer is getting a bit "impatient" and breaks out as soon as it has what it needed.


def pump():
    numbers = [1, 2, 3, 4]
    for number in numbers:
        yield number
        print("Have sent", number)
    print("Last number was sent")


for number in pump():
    print("Got", number)
    # THESE TWO NEW LINES
    if number >= 2:
        break

print("All done")

What do you think the out is now? I'll tell you:

Got 1
Have sent 1
Got 2
All done

In other words, the potentially important lines print("Have sent", number) and print("Last number was sent") never gets executed! The generator could tell the consumer (through documentation) of the generator "Don't break! If you don't want me any more raise a StopIteration". But that's not a feasible requirement.

The Solution

But! There is a better solution and that's to catch GeneratorExit exceptions.


def pump():
    numbers = [1, 2, 3, 4]
    try:
        for number in numbers:
            yield number
            print("Have sent", number)
    except GeneratorExit:
        print("Exception!")
    print("Last number was sent")


for number in pump():
    print("Got", number)
    if number == 2:
        break
print("All done")

Now you get what you might want:

Got 1
Have sent 1
Got 2
Exception!
Last number was sent
All done

Next Level Stuff

Note in the last example's output, it never prints Have sent 2 even though the generator really did send that number. Suppose that's an important piece of information, then you can reach that inside the except GeneratorExit block. Like this for example:


def pump():
    numbers = [1, 2, 3, 4]
    try:
        for number in numbers:
            yield number
            print("Have sent", number)
    except GeneratorExit:
        print("Have sent*", number)
    print("Last number was sent")


for number in pump():
    print("Got", number)
    if number == 2:
        break
print("All done")

And the output is:

Got 1
Have sent 1
Got 2
Have sent* 2
Last number was sent
All done

The * is just in case we wanted to distinguish between a break happening or not. Depends on your application.

Writing a custom Datadog reporter using the Python API

May 21, 2018
2 comments Python

Datadog is an awesome sofware-as-a-service where you can aggregate and visualize statsd metrics sent from an application. For visualizing timings you create a time series graph. It can look something like this:

Time series

This time series looks sane because because it's timings made very very frequently. But what if it happens very rarely. Like once a day. Then, the graph doesn't look very useful. See this example:

"Rare time" series

Not only is it happening rarely, the amount of seconds is really quite hard to parse. I.e. what's 2.6 million milliseconds (answer is approximately 45 minutes). So to solve that I used the Datadog API. Now I can get a metric of every single point in milliseconds and I can make a little data table with human-readable dates and times.

The end result looks something like this:

SCOPE: ENV:PROD
+-------------------------+------------------------+-----------------------+
|          WHEN           |        TIME AGO        |       TIME TOOK       |
+=========================+========================+=======================+
| Mon 2018-05-21T17:00:00 | 2 hours 43 minutes ago | 23 minutes 32 seconds |
+-------------------------+------------------------+-----------------------+
| Sun 2018-05-20T17:00:00 | 1 day 2 hours ago      | 20 seconds            |
+-------------------------+------------------------+-----------------------+
| Sat 2018-05-19T17:00:00 | 2 days 2 hours ago     | 20 seconds            |
+-------------------------+------------------------+-----------------------+
| Fri 2018-05-18T17:00:00 | 3 days 2 hours ago     | 2 minutes 24 seconds  |
+-------------------------+------------------------+-----------------------+
| Wed 2018-05-16T20:00:00 | 4 days 23 hours ago    | 38 minutes 38 seconds |
+-------------------------+------------------------+-----------------------+

It's not gorgeous and there are a lot of caveats but it's at least really easy to read. See the metrics.py code here.

I don't think you can run this code since you don't have the same (hardcoded) metrics but hopefully it can serve as an example to whet your appetite.

What I'm going to do next, if I have time, is to run this as a Flask app instead that outputs a HTML table on a Herokup app or something.