Token Auth

Motivation

Building applications that require authentication and protection is common at TwoPi Code.

We’ve found the most practical token exchange implementation exists with 2 different kinds of tokens and the following authentication flow:

  1. A user performs a POST to a login endpoint /api/v1/login with their credentials.

  2. The API validates these credentials, and begins the token exchange process. The API generates a refresh token, and stores it. This token must have all the information required to generate a shortlived token. A shortlived token is then generated. A possible response from this endpoint could be:

    {
        "refreshToken": "abcdefg",
        "token": "shortlived-token-could-be-a-jwt"
    }
    
  3. The client stores both these tokens. (Possibly in localstorage).

  4. Before every request, the client checks if their shortlived token has expired. If it hasn’t expired, they can just send the shortlived token as usual. However, if it is expired, a renewal must occur.

  5. The client performs a POST to the renew endpoint, providing the

    refresh token they recieved at login.

  6. The server checks if the refresh token is still valid (ie, hasn’t been revoked, or inactive for too long), and using this returns a new shortlived token. A possible response from this endpoint could be:

    {
        "token": "shortlived-token-could-be-a-jwt"
    }
    
  7. The user stores the new token. If it was found that the refresh token had been revoked, then the user will be prompted to log back into the app.

  8. When the user wishes to log out, a POST request to a logout endpoint occurs, sending the user’s shortlived token.

  9. The server revokes the shortlived token’s associated refresh token, thus, invalidating any further renewals.

There are 4 main advantages to using an authentication flow like this:

  1. Users never have to log in again as long as they stay active.
  2. Tokens are stateless (Unless you need to check for revocation on every request, but this is still a very lightweight operation.)
  3. As long as shortlived tokens have a short enough expiry, a compromised shortlived token can have little impact.
  4. Heavy token renewal/stateful operations can be minimised to happening ONLY when the token has expired.

Thus, this package exposes helpers to assist in implementing such an authentication workflow. It makes no assumptions about your refresh token objects. It does assume that your prefered short lived token type is a JWT.

Usage

Within your project, create class for your short lived token. Within this class, we need to define a few things. Let’s store a user_id, a list of scopes, and the associated refresh token on the short lived token. We can use a marshmallow schema for this:

from twopi_flask_utils.token_auth import (
    ShortlivedTokenMixin, parse_auth_header, auth_required)
from marshmallow import fields


class ShortlivedToken(ShortlivedTokenMixin):

    class TokenSchema(ShortlivedTokenMixin.TokenSchema):
        rfid = fields.String(attribute='refresh_token_id')
        user_id = fields.String(attribute='user_id')
        scopes = fields.List(fields.String(), attribute='scopes')

    def __init__(self, refresh_token_id, user_id, scopes, *args, **kwargs):
        super(ShortlivedToken, self).__init__(*args, **kwargs)

        self.refresh_token_id = refresh_token_id
        self.user_id = user_id
        self.scopes = scopes

I used shortend names like rfid in the schema to cut down on bytes transfered with the token. JWT’s are meant to be lightweight.

Finally, we need to define a way to load a ShortlivedToken from a refresh token. We define the classmethod, from_refresh_token to do this:

from twopi_flask_utils.token_auth import (
    ShortlivedTokenMixin, parse_auth_header, auth_required)
from marshmallow import fields


class ShortlivedToken(ShortlivedTokenMixin):

    class TokenSchema(ShortlivedTokenMixin.TokenSchema):
        rfid = fields.String(attribute='refresh_token_id')
        user_id = fields.String(attribute='user_id')
        scopes = fields.List(fields.String(), attribute='scopes')

    def __init__(self, refresh_token_id, user_id, scopes, *args, **kwargs):
        super(ShortlivedToken, self).__init__(*args, **kwargs)

        self.refresh_token_id = refresh_token_id
        self.user_id = user_id
        self.scopes = scopes

    @classmethod
    def from_refresh_token(Cls, refresh_token):
        # Fetch the token expiry from the application configuration
        shortlived_expiry = current_app.config['SHORT_LIVED_TOKEN_EXPIRY']

        return Cls(
            refresh_token_id=refresh_token.id,
            user_id=refresh_token.user_id,
            scopes=refresh_token.scopes,
            expiry=datetime.datetime.now(pytz.UTC) + shortlived_expiry,
        )

As mentioned above, we use information from the refresh token to build a ShortlivedToken. To complete the example, we’ll add an in-memory refresh token store and a basic implementation:

import uuid
import random
import string
random_alpha = string.digits + string.ascii_letters

class RefreshToken():
    def __init__(self, user_id, scopes):
        self.id = uuid.uuid4().hex
        self.user_id = user_id
        self.scopes = scopes
        self.token = ''.join(random.choice(random_alpha) for _ in range(80))


# Store the granted refresh tokens in memory.
refresh_tokens = []

Finally, we will create a flask app and implement the 3 endpoints for authentication. /login, /logout, and /renew. This is now the final implementation:

from flask import current_app, Flask, request, abort, jsonify, g
from twopi_flask_utils.token_auth import (
    ShortlivedTokenMixin, parse_auth_header, auth_required)
from marshmallow import fields
import uuid
import random
import string
import pytz
import datetime
import logging

log = logging.getLogger(__name__)


class ShortlivedToken(ShortlivedTokenMixin):

    class TokenSchema(ShortlivedTokenMixin.TokenSchema):
        rfid = fields.String(attribute='refresh_token_id')
        user_id = fields.String(attribute='user_id')
        scopes = fields.List(fields.String(), attribute='scopes')

    def __init__(self, refresh_token_id, user_id, scopes, *args, **kwargs):
        super(ShortlivedToken, self).__init__(*args, **kwargs)

        self.refresh_token_id = refresh_token_id
        self.user_id = user_id
        self.scopes = scopes

    @classmethod
    def from_refresh_token(Cls, refresh_token):
        # Fetch the token expiry from the application configuration
        shortlived_expiry = current_app.config['SHORT_LIVED_TOKEN_EXPIRY']

        return Cls(
            refresh_token_id=refresh_token.id,
            user_id=refresh_token.user_id,
            scopes=refresh_token.scopes,
            expiry=datetime.datetime.now(pytz.UTC) + shortlived_expiry,
        )


random_alpha = string.digits + string.ascii_letters

class RefreshToken():
    def __init__(self, user_id, scopes):
        self.id = uuid.uuid4().hex
        self.user_id = user_id
        self.scopes = scopes
        self.token = ''.join(random.choice(random_alpha) for _ in range(80))


# Store the granted refresh tokens in memory.
refresh_tokens = []

app = Flask(__name__)
app.config.update({
    'SHORT_LIVED_TOKEN_EXPIRY': datetime.timedelta(hours=1),
    'SECRET_KEY': 'supersecret'
})


@app.route('/login', methods=['POST'])
def login():
    # Check if the credentials were correct
    if request.form.get('username') != 'test' or \
            request.form.get('password') != 'test':
        abort(401)

    # Create a new refresh token
    refresh_token = RefreshToken(user_id=request.form.get('username'),
                                 scopes=['360noscope'])

    # Persist the refresh token so we can renew it later
    refresh_tokens.append(refresh_token)

    shortlived_token = ShortlivedToken.from_refresh_token(refresh_token)
    log.info("Generated token with payload: {}".format(shortlived_token))

    return jsonify({
        'token': shortlived_token.dump(),
        'refreshToken': refresh_token.token
    })


@app.route('/logout', methods=['POST'])
@parse_auth_header(ShortlivedToken)
@auth_required()
def logout():

    # Find the associated refresh token
    for refresh_token in refresh_tokens:
        if refresh_token.id == g.token.refresh_token_id:
            break # Found the associated token

    else: # nobreak
        # Couldn't find the token. Maybe it has been revoked.
        abort(401)

    # Remove the refresh token from the store. It has now been revoked.
    refresh_tokens.remove(refresh_token)

    return jsonify({
        'status': 'success'
    })


@app.route('/renew', methods=['POST'])
def renew():
    token_string = request.form.get('refreshToken')

    # Find the refresh token in the store
    for refresh_token in refresh_tokens:
        if refresh_token.token == token_string:
            break # Found the token that we need.

    else: # nobreak
        # Couldn't find the token. Oops
        abort(401)

    # Make a new shortlived token
    shortlived_token = ShortlivedToken.from_refresh_token(refresh_token)
    log.info("Generated token with payload: {}".format(shortlived_token))

    # Respond to the client with the new token
    return jsonify({
        'token': shortlived_token.dump()
    })


@app.route('/protected', methods=['POST'])
@parse_auth_header(ShortlivedToken)
@auth_required()
def protected():
    return "Welcome, {}".format(g.token.user_id)


if __name__ == '__main__':
    app.run(debug=True, port=5005)

Using this example, you should be able to exchange credentials for a refresh token, a shortlived token, and perform subsequent renewalls and revokations.

In the above example, the functions parse_auth_header() and auth_required() are used on the protected endpoint and logout endpoint.

API

class twopi_flask_utils.token_auth.ShortlivedTokenMixin(expiry=None, issuer=None, subject=None, audience=None, not_before=None, issued_at=None)

A base class for implementing a short-lived token using JWTs

Parameters:
  • expirydatetime: The expiry of this token
  • issuerstring
  • subjectstring
  • audiencestring
  • not_beforedatetime: A datetime of when the token becomes valid for use
  • issued_atdatetime: When the token was issued. This value is overwritten during dump()
class TokenSchema(*, only: Union[Sequence[str], Set[str]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Dict[KT, VT] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: str = None)

The schema to use to serialize/de-serialize JWT’s with.

dump(secret=None)

Dump the token into a stringified JWT.

Parameters:secret – The secret to sign the JWT with. If this is omitted, the secret will be sourced from current_app.config['SECRET_KEY']
Returns:The stringified JWT.
classmethod from_refresh_token(refresh_token)

Given a refresh token, return an instance of ShortLivedTokenMixin configured using information from refresh_token.

Parameters:refresh_token – A refresh token instance.
Returns:A new ShortLivedToken instance.

Warning

You must implement this method

classmethod load(token_string, secret=None, issuer=None, audience=None)

Load from a JWT (token_string)

Parameters:
  • token_string – The raw string to load from
  • secret – The secret that the JWT was signed with to check validity. If this is omitted, the secret will be sourced from current_app.config['SECRET_KEY']
  • issuer – The issuer the JWT decode should expect
  • audience – The audience the JWT decode should expect
Returns:

A de-serialized ShortLivedToken instance.

twopi_flask_utils.token_auth.auth_required()

Force authentication on an endpoint. Checks if g.token is not None, and returns a 401 if it is.

Example:

@app.route('/auth-required')
@auth_required()
def my_endpoint():
    return "Hello World"
twopi_flask_utils.token_auth.parse_auth_header(token_cls, auth_header=True, query_string=True, secret=None)

A decorator to extract a token from either the Authorization header OR the query string parameter ?token=.

Parameters:
  • token_cls – An instance of ShortlivedTokenMixin OR a class which implements the classmethod load(token_string, secret). An instance will be available on g.token.
  • auth_header – (optional, bool) Extract the token from the auth header (Default: True)
  • query_string – (optional, bool) Extract the token from the query string (Default: True)
  • secret – (optional) A secret to pass to token_cls.load(raw, secret)

Any wrapped function will be able to access both g.token and g.raw_token to read the token_cls instance and raw token string respectively.

Authorization header expects tokens in the format of Bearer <token string>