Implementing role and attribute based access control in SQLAlchemy with Cerbos

Published by Sam Lock on September 27, 2022
image

If you maintain an application that handles any state at all, it's likely that you've had to figure out how to both store that state, as well as how to load it into the application layer and act on it in any which way your business logic requires.

Perhaps, in your case, a lot of the computational "heavy lifting" is done by the database, and the application is just an abstraction layer where you write your database queries. Or maybe on the contrary, the database is just a basic store which provides the data for the application to manage all of the tricky logic itself.

Regardless, there's many ways to build an application (as the common idiom doesn't go). Application design is a vast and complex process, but one thing we can do to make that process more manageable is to use tools that take a lot of the implementation complexity away...

Enter SQLAlchemy

SQLAlchemy has established itself as one of the standards in database abstraction layers in the Python world. It offers two distinct ways of communicating with the DB; via it's lower-level Core SQL abstraction toolkit, or via it's ORM component, which extends Core to offer some convenient, higher-level abstractions.

What we're building

In this run-through, we'll be building an application that manages a "Contact directory", enabling users to keep track of their contacts, along with useful information such as employment information (current company etc).

We're going to explore how to model our data, map it to Cerbos entities, and interact with it in a clean, efficient and reusable way.

We'll be building a Python FastAPI server and securing it using the following Cerbos APIs:

  • CheckResources: e.g. can User X from the Sales department access Contact Y?
  • PlanResources: e.g. which contacts can User X from the Marketing department access?

The full source code for this demo can be found in our repo here.

Prerequisites

The database

Setting up our models

We have the following entities within our application:

  • User: the person interacting with the application
  • Contact: a person within a User's directory (a User can have many Contacts)
  • Company: the company that a Contact is currently employed with (a Company can have many Contacts)

In order to persist and manage these models, we need to be able to represent them in code in a way that can be mapped to our database layer. This is where SQLAlchemy comes in.

SQLAlchemy allows us to represent our relational database tables as classes, with attributes representing the columns of those tables. An object instance of one of these classes will represent a single row in the table. An example is shown below:

from sqlalchemy import Column, String
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class User(Base):
    __tablename__ = "user"

    id = Column(String, primary_key=True)
    username = Column(String(255))
    email = Column(String(255))
    # ...

It also allows us to go a step further, and model relationships between these tables (via variations of table joins). In our case, we want to be able to model the one-to-many relationships mentioned above:

class User(Base):
    __tablename__ = "user"

    # ...
    contacts = relationship("Contact", back_populates="owner")

class Contact(Base):
    __tablename__ = "contact"

    id = Column(String, primary_key=True)
    # ...
    owner_id = Column(String, ForeignKey("user.id"))
    owner = relationship("User", back_populates="contacts", lazy="joined")

You can see how we relate the two tables via the relationship function. In setting a relationship field on each linked class, we establish a bidirectional relationship between the objects (with the "reverse" side being a many-to-one). In this particular case, the ForeignKey placed on the child table infers the many-to-one side, and as such, allows for child table objects to reference the parent via child.owner/child.owner_id. The lazy="joined" parameter indicates to SQLAlchemy that we'd like to lazily load the related object at attribute access time.

The full table definitions can be found in this module.

You can see how SQLAlchemy ORM entity objects can then be used to reference each another in code:

from sqlalchemy import select

# Session is a SQLAlchemy sessionmaker instance
with Session() as s:
    user = s.scalars(select(User).where(User.username == "gandalf")).first()
    user.email  # "greybeard99@midearth.com"

    # Note, in order to reference contacts with a `lazy` loading pattern, the
    # attribute lookup needs to occur in the context of a session - hence it's
    # in the Session() context manager scope.
    contact = user.contacts[0]
    contact.user_id == user.id  # True

Check out the excellent SQLAlchemy documentation for more information on relationships.

Connecting to our database

SQLAlchemy is a wonderful abstraction layer between Python and a whole array of different relational databases. By specifying the "dialect" when connecting to a DB engine, we tell it which relational database it is connecting to.

For our demo, we’ll be setting up a simple, ephemeral SQLite instance. We won’t even persist it to disk; each time the application is started, it’ll build the DB in memory and populate it with a migration script.

We create the engine like so:

from sqlalchemy import create_engine
from sqlalchemy.pool import StaticPool

engine = create_engine(
    "sqlite://",  # the absence of a specified URL infers a `:memory:` database (e.g. no disk persistence)
    connect_args={"check_same_thread": False},  # in FastAPI, when using sync (def) functions, more than one thread could interact with the database
                                                # for the same request, so we need to make SQLite know that it should allow that
    poolclass=StaticPool,  # Use a static pool to persist state with an in memory instance of sqlite
)

Tables and metadata

To start using the SQLAlchemy Expression Language, we will want to have Table objects constructed that represent all of the database tables we are interested in working with. Each Table may be declared, meaning we explicitly spell out in source code what the table looks like, or may be reflected, which means we generate the object based on what’s already present in a particular database.

Whether we will declare or reflect our tables, we start out with a collection that will be where we place our tables known as the MetaData object. This object is essentially a facade around a Python dictionary that stores a series of Table objects keyed to their string name.

Our classes above inherit from a base class generated from a call to declarative_base(). This "declarative" method allows us to declare user-defined classes and Table metadata at once. Each time a class inherits from this Base class, it is added to this collection, or registry. The following call will generate the database tables from the metadata:

Base.metadata.create_all(engine)

Populating the database

We can then generate a session from our engine instance, and use it to populate our newly generated tables:

with Session() as s:
    coca_cola = Company(name="Coca Cola")
    s.add(coca_cola)
    s.commit()

    john = User(
        name="John",
        username="john",
        email="john@cerbos.demo",
        role="user",
        department="Sales",
    )
    s.add(john)
    s.commit()

    s.add(Contact(
            first_name="Nick",
            last_name="Smyth",
            marketing_opt_in=True,
            is_active=True,
            owner=john,
            company=coca_cola,
    ))
    s.commit()

You can see in the example above (in the Contact definition) how we can define relationships by referencing instances of the table classes.

Again, the full source code for this section can be found here.

The API

We now have a database which can be declared and populated on demand, and models which allow us to interact with it. The next step is to build an API layer to expose the data, and to secure the endpoints and resources with Cerbos.

We'll be creating our server with FastAPI. The source code for this section can be found here.

Dependency injection with FastAPI dependables

FastAPI allows you to define callables called "dependables", which are functions that take all of the same arguments as a "path operation function" and return whatever we might require for the handler. The Depends(fn) class takes the callable and on execution will return the default argument, if required. We define a few dependables which we can use across our endpoints.

Firstly, one to retrieve the cerbos Principal instance from the username (which in itself is retrieved via the FastAPI provided HTTPBasic dependable):

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials

security = HTTPBasic()

def get_principal(credentials: HTTPBasicCredentials = Depends(security)) -> Principal:
    username = credentials.username

    with Session() as s:
        # retrieve `user` from the DB to access the attributes
        user = s.scalars(select(User).where(User.username == username)).first()
        if user is None:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail="User not found",
            )

    return Principal(user.id, roles={user.role}, attr={"department": user.department})

This can then be used on all endpoints to authenticate the user, and then assert that the user exists in the database:

@app.get("/contacts")
def get_contacts(p: Principal = Depends(get_principal)):
    # do something with the principal "p"

We then create a dependable which attempts to retrieve the Contact from the database based on a path parameter in the URL: contact_id:

def get_db_contact(contact_id: str) -> Contact:
    with Session() as s:
        contact = s.scalars(select(Contact).where(Contact.id == contact_id)).first()
        if contact is None:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail="Contact not found",
            )
    return contact

This in turn can then be nested in another dependable which attempts to return the Contact Resource instance:

def get_resource_from_contact(
    db_contact: Contact = Depends(get_db_contact),
) -> Resource:
    return Resource(
        id=db_contact.id,
        kind="contact",
        attr=jsonable_encoder(
            {n.name: getattr(db_contact, n.name) for n in Contact.__table__.c}
        ),
    )

These can then be used to attempt to retrieve a Cerbos Resource, or a SQLAlchemy Contact instance respectively, on routes which include the contact_id path parameter:

@app.delete("/contacts/{contact_id}")
def delete_contact(
    r: Resource = Depends(get_resource_from_contact),
    p: Principal = Depends(get_principal),
):
    # do something with the resource

@app.get("/contacts/{contact_id}")
def get_contact(
    db_contact: Contact = Depends(get_db_contact),
    p: Principal = Depends(get_principal)
):
    # optionally, call the dependable direct to retrieve the resource from the db Contact instance
    resource = get_resource_from_contact(db_contact)

Defining our API schema

Some of our routes will require specific parameters in the payload in order to carry out the given request. For example, we might need routes for creating or updating new or existing Contacts.

FastAPI provides a nice interface to enforce request schema via "Pydantic" models:

from pydantic import BaseModel

class ContactSchema(BaseModel):
    first_name: str
    last_name: str
    owner_id: str
    company_id: str
    is_active: bool = False
    marketing_opt_in: bool = False

    class Config:
        # tell the Pydantic model to read the data even if it is not a dict, but an ORM model
        # (or any other arbitrary object with attributes)
        orm_mode = True

Once we've defined these schema models, we can use them in the FastAPI routes:

@app.post("/contacts/new")
def create_contact(
    contact_schema: ContactSchema, p: Principal = Depends(get_principal)
):
    with CerbosClient(host="http://localhost:3592") as c:
        if not c.is_allowed(
            "create",
            p,
            Resource(
                id="new",
                kind="contact",
            ),
        ):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN, detail="Unauthorized"
            )

    db_contact = Contact(**contact_schema.dict())
    with Session() as s:
        s.add(db_contact)
        s.commit()
        s.refresh(db_contact)

    return {"result": "Created contact", "contact": db_contact}

FastAPI will automatically validate the input payload to ensure required fields are present, and types are correct (as well as other optional checks). We can then use the schema model attributes to generate SQLAlchemy models as you can see above.

The schema for our demo can be found in this module.

Protecting the routes

Now we have our dependables and API schema defined, we can start to define the routes and secure them using Cerbos.

We can make granular checks against specific principal:resource:action mappings using Cerbos' CheckResources API (via the is_allowed method):

@app.get("/contacts/{contact_id}")
def get_contact(
    db_contact: Contact = Depends(get_db_contact), p: Principal = Depends(get_principal)
):
    r = get_resource_from_contact(db_contact)

    with CerbosClient(host="http://localhost:3592") as c:
        if not c.is_allowed("read", p, r):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN, detail="Unauthorized"
            )

    return db_contact

However, sometimes, we want to establish which resources a Principal has access to. To do this, we can use the PlanResources API.

The query planner

If we provide Cerbos with a Principal and a description of the resource they're trying to access (ResourceDesc), we can ask it for a query plan.

The PlanResources call returns one of the following:

  • KIND_ALWAYS_ALLOWED
  • KIND_ALWAYS_DENIED
  • KIND_CONDITIONAL

In the final case, it'll also return an abstract syntax tree (AST) of the condition that must be satisfied to allow the action:

@app.get("/contacts")
def get_contacts(p: Principal = Depends(get_principal)):
    with CerbosClient(host="http://localhost:3592") as c:
        rd = ResourceDesc("contact")

        # Get the query plan for "read" action
        plan = c.plan_resources("read", p, rd)
        print(json.dumps(plan.to_dict(), sort_keys=False, indent=4))

Cerbos provides a SQLAlchemy adapter library with an API that takes the query plan response, and uses it to generate a SQLAlchemy query object. Continuing below:

    query = get_query(
        plan,
        Contact,
        {
            "request.resource.attr.owner_id": User.id,
            "request.resource.attr.department": User.department,
            "request.resource.attr.is_active": Contact.is_active,
            "request.resource.attr.marketing_opt_in": Contact.marketing_opt_in,
        },
        [(User, Contact.owner_id == User.id)],
    )

    # Optionally reduce the returned columns (`with_only_columns` returns a new `select`)
    # NOTE: this is wise to do as standard, to avoid implicit joins generated by sqla `relationship()` usage, if present
    query = query.with_only_columns(
        Contact.id,
        Contact.first_name,
        Contact.last_name,
        Contact.is_active,
        Contact.marketing_opt_in,
    )
    print(query.compile(compile_kwargs={"literal_binds": True}))

The provided get_query function accepts the following parameters, respectively:

  1. query plan
  2. a primary SQLAlchemy Table or ORM DeclarativeMeta type (the FROM table part of the resulting query)
  3. the "attribute map" - responsible for mapping the Cerbos resource attribute strings to the associated SQLAlchemy columns (type Column or ORM InstrumentedAttribute)
  4. OPTIONAL: list of explicit table joins - required only if more than one table specified in primary table + attribute map

It returns a SQLAlchemy Selectable, which can be further extended/reduced, and then used to query the database:

    # ...
    with Session() as s:
        rows = s.execute(query).fetchall()

    return rows

Run the server

Now we understand how everything works, let's fire up the server and the Cerbos PDP, and test it out.

Clone the repo:

git clone git@github.com:cerbos/python-sqlalchemy-cerbos.git
cd python-sqlalchemy-cerbos

Start up the Cerbos PDP instance docker container:

cd cerbos
./start.sh

Install Python dependencies:

# from project root
pdm install

Start the FastAPI dev server:

pdm run demo

Example requests

Get all permitted contacts

curl http://john@localhost:8000/contacts

Get a single contact

Sales user, contact owned => 200 OK

curl -i http://john@localhost:8000/contacts/1

Sales user, contact not owned or active => 403 Forbidden

curl -i http://john@localhost:8000/contacts/4

Create a contact

Sales user => 200 OK

curl -i http://john@localhost:8000/contacts/new \
  -H 'Content-Type: application/json' \
  -X POST \
  -d '{"first_name": "frodo", "last_name": "baggins", "owner_id": "2", "company_id": "2"}'

Marketing user (e.g. geri) => 403 Forbidden

Delete a contact

Contact owner => 200 OK

curl -i http://john@localhost:8000/contacts/1 -X DELETE

Non-owner => 403 Forbidden

curl -i http://john@localhost:8000/contacts/3 -X DELETE

If you have any questions or feedback, or to chat to us and other like-minded technologists, please join our Slack community!

Book a free Policy Workshop to discuss your requirements and get your first policy written by the Cerbos team