Fivetran, Data Lineage, and you.

Fivetran, Data Lineage, and you.
Photo by Toa Heftiba / Unsplash

Introduction

Since its founding in 2012, Fivetran has formed a core component of the data stack for many organizations. This is especially true for data teams who have adopted ELT architectures in which source data is copied directly into a warehouse before performing transformations.

In addition to its great connectors, Fivetran exposes a fantastic API that allows users to programmatically control much of their infrastructure. Today we are going to walk through everything you need to extract data lineage from Fivetran.

Background

There are a few Fivetran concepts we need to be familiar with before proceeding.

Sources & Destinations

Source are the places we intend to copy data from, such as Postgres, Airtable, and SalesForce, while destinations are the places where we are going to copy data into, like Snowflake, BigQuery, and Redshift.

Source -> Destination

Connectors

Connectors are responsible for actually synchronizing data from a source to a destination. How it accomplishes that goal doesn't matter, all we need to know is that they copy data from Source -> Destination.

You can also collect sets of related connectors together into something called a Group. One common strategy is to create one group of connectors for production, and another for staging or development.

The Game Plan

Our goal is to extract data lineage from the Fivetran API for each connector.
Conceptually the Connector ought to tell us which Tables and Columns are related to each other between different sources and destinations. So if we can find all of our connectors, we can find the source & destination for each connector, then determine which tables and columns in each destination correspond to which counterpart in the connector source.

Fivetran constructs a unique identifier for every source and destination through a source, table, and column identifier. The last step will be to find each of these values for every source and destination in each connector.

Thankfully Fivetran has provided a handy visualization describing the relationship between source and destination identifiers through different API endpoints.

To Recap

graph LR A[Connector Group] --> B[Connector] B --> |Source| E[Schema] B --> |Destination| E E --> G[Table] G --> I[Column] classDef style1 fill:#f9d0c4,stroke:#333,stroke-width:2px; classDef style2 fill:#7cb4b6,stroke:#333,stroke-width:2px; classDef style3 fill:#ffc93c,stroke:#333,stroke-width:2px; classDef style4 fill:#198754,stroke:#333,stroke-width:2px; class A style1 class B style2 class E style3 class G style4 class I style4
  1. Query for all of Β connector group's
  2. For each connector group, query connectors
  3. For each connector, query schema metadata
  4. For each schema, query table metadata
  5. For each table, query column metadata
  6. Tie it all together.

Execution

πŸ’‘
If at any point you'd like to get your hands dirty, check out the interactive Fivetran API link in the references below.

For the purposes of this guide, we will be using the python requests library for communicating with the Fivetran API. You could use whatever language you prefer or something like curl if you preferred. If you're following along, you should make sure requests is installed with pip install requests before proceeding. We are also going to use a shared session to make scripting a little easier.

Here is a little code snippet to get you started

import requests

session = requests.Session()
self.session.headers.update({"Accept": "application/json"})
self.session.params.update({"limit": 10000}) # This controls the number of results Fivetran will return in each page

The Fivetran API is also paginated, meaning it won't necessarily return all of the results in a single request. You can tell whether this affects you if the response includes a next_cursor value under the data field of the response json. To keep things simple we are ignoring pagination in the code snippets below.

Connector Groups

Each group has a name and a human readable group_id like "decent-dropsy." We need to find all of the group_id's associated with our account. Thankfully, there's already an API endpoint we can use to find all of our groups.
To do so, we will need to

GET https://api.fivetran.com/v1/groups.

A successful result will return something that looks like this

{
    "code": "Success",
    "data": {
        "items": [
            {
                "id": "projected_sickle",
                "name": "Staging",
                "created_at": "2018-12-20T11:59:35.089589Z"
            },
            {
                "id": "schoolmaster_heedless",
                "name": "Production",
                "created_at": "2019-01-08T19:53:52.185146Z"
            }
        ],
        "next_cursor": "eyJza2lwIjoyfQ"
    }
}

Now in python we can query for groups as follows

groups_url = "https://api.fivetran.com/v1/groups"

groups = session.get(url).json()['data']['items']

Connectors

Great, now that we have all of the groups, we need to find the connectors associated with each group. To do that we will

GET https://api.fivetran.com/v1/groups/{group_id}/connectors 

It returns results which look like this

{
    "code": "Success",
    "data": {
        "items": [
            {
                "id": "iodize_impressive",
                "group_id": "projected_sickle",
                "service": "salesforce",
                "service_version": 1,
                "schema": "salesforce",
                "connected_by": "concerning_batch",
                "created_at": "2018-07-21T22:55:21.724201Z",
                "succeeded_at": "2018-12-26T17:58:18.245Z",
                "failed_at": "2018-08-24T15:24:58.872491Z",
                "sync_frequency": 60,
                "status": {
                    "setup_state": "connected",
                    "sync_state": "paused",
                    "update_state": "delayed",
                    "is_historical_sync": false,
                    "tasks": [],
                    "warnings": []
                }
            }
        ],
        "next_cursor": "eyJza2lwIjoxfQ"
    }
}

Now in python

connectors = []
for group in groups:
    group_id = group['id']
    url = f"https://api.fivetran.com/v1/groups/{group_id}/connectors"
    group_connectors = session.get(url).json()['data']['items']
    connectors.extend(group_connectors)

Schemas

Alright, now we're cooking with gas. With all of the connectors it's time to start pulling column level data lineage. To do that we will begin by querying the Metadata schemas endpoint with

GET https://api.fivetran.com/v1/metadata/connectors/{connector_id}/schemas.

Again, results will look something like this

{
  "code": "Success",
  "data": {
    "items": [
      {
        "id": "bWFpbl9wdWJsaWM",
        "name_in_source": "Main Public",
        "name_in_destination": "main_public",
      },
      {
        "id": "dXNlcl9zZXR0aW5ncw",
        "name_in_source": "Local backup",
        "name_in_destination": "local_backup"
      }
    ],
    "next_cursor": "eyJza2lwIjoxfQ"
  }
}

For each connector we can now create column level lineage between our sources and destinations. We can iterate over each connector pulling out an associated connector_id from the id field of each element in connectors.

schema_map = {}
url = f"https://api.fivetran.com/v1/metadata/connectors/{connector_id}/schemas"
    
schemas = session.get(url).json()['data']['items']
for schema in schemas:
    schema_map[schema.pop('id')] = schema

Each schema has multiple tables associated with it and each of the id fields in the response correspond to a parent_id in Tables as we will see in a second.

Tables

Now that we have schemas we will do basically the same thing for the tables endpoint. In this case we will be hitting

GET https://api.fivetran.com/v1/metadata/connectors/{connector_id}/tables

With a result looking like

{
  "code": "Success",
  "data": {
    "items": [
      {
        "id": "NjUwMTU",
        "parent_id": "bWFpbl9wdWJsaWM",
        "name_in_source": "User Accounts",
        "name_in_destination": "user_accounts"
      },
      {
        "id": "NjUwMTY",
        "parent_id": "bWFpbl9wdWJsaWM",
        "name_in_source": "User Subscriptions",
        "name_in_destination": "user_subscriptions"
      },
      {
        "id": "NjUwMTW",
        "parent_id": "bWFpbl9wdWJsaWM",
        "name_in_source": "Account Details",
        "name_in_destination": "account_details"
      }
    ],
    "next_cursor": "YUWEudlwIjoxkK"
  }
}

Things are a little more complicated because we have to deal with both id and parent_id which, just as a reminder, is the id value from it's corresponding schema. This time we will use a nested dictionary looking like Dict[<schema_id>, Dict[<table_id>, <names dictionary>]].

table_map = {}

url = f"https://api.fivetran.com/v1/metadata/connectors/{connector_id}/tables"
    
tables = session.get(url).json()['data']['items']
for table in tables:
    schema_id = table.pop('parent_id')
    table_id = table.pop('id')
    
    table_map.setdefault(schema_id, {})
    table_map[schema_id].setdefault(table_id, {})
    
    table_map[schema_id][table_id] = table

As with schemas, each table has multiple associated columns. We will have to play a similar game one more time.

Columns

Now we will use the columns endpoint

GET https://api.fivetran.com/v1/metadata/connectors/{connector_id}/columns

With a corresponding json response

{
  "code": "Success",
  "data": {
    "items": [
      {
        "id": "NTY4ODgzNDI",
        "parent_id": "NjUwMTU",
        "name_in_source": "id",
        "name_in_destination": "id",
        "type_in_source": "Integer",
        "type_in_destination": "Integer",
        "is_primary_key": true,
        "is_foreign_key": false
      },
      {
        "id": "NTY4ODgzNDM",
        "parent_id": "NjUwMTU",
        "name_in_source": "FirstName",
        "name_in_destination": "first_name",
        "type_in_source": "String",
        "type_in_destination": "Text",
        "is_primary_key": false,
        "is_foreign_key": false
      }
    ],
    "next_cursor": "YUWEudlwIjoxkK"
  }
}

In this case the parent_id is actually a reference to tables rather than schema though. So, we are going to produce a dictionary that looks like Dict[<table_id>, Dict[<column_id>, <names dictionary>]]

column_map = {}
url = f"https://api.fivetran.com/v1/metadata/connectors/{connector_id}/columns"
    
columns = session.get(url).json()['data']['items']
for column in columns:
    table_id = table.pop('parent_id')
    column_id = table.pop('id')
    
    column_map(table_id, {})
    column_map[table_id].setdefault(column_id, {})
    
    column_map[table_id][column_id] = column

Bringing It Together

We've come a long way, including querying for all of the tables and columns being managed by Fivetran alongside a wealth of metadata about our columns, like their types, and whether they are primary or foreign keys. What's left is to bring together the final relationship between all of the names in sources and the corresponding destination names. To do that we will run one final piece of python code to generate complete names consisting of {schema}.{table}.{column}.

column_lineage_map = {}

for schema_id, schema_names in schema_map.items()():
    source_schema = schema_names['name_in_source']
    destination_schema = schema_names['name_in_destination']
    
    for table_id, table_names in table_map[schema_id].items():
        source_table = table_names['name_in_source']
        destination_table = table_names['name_in_destination']
    
        for column_id, column_names in column_map[table_id].items():
            source_column = column_names['name_in_source']
            destination_column = column_names['name_in_destination']
            
            full_source_name = f"{source_schema}.{source_table}.{source_column}"
            full_destination_name = f"{destination_schema}.{destination_table}.{destination_column}"
            column_lineage_map[full_source_name] = full_destination_name

And that's it, you can now explicitly relate every source identifier in your data stack with a corresponding destination identifier.

What's Next

If you've gotten this far you should have everything you need to extract and visualize data lineage in Fivetran. If you're looking for something pre-built checkout the grai-source-fivetran package, it handles. If you're interested in exploring source code check it out here.

References

  1. Fivetran API Docs
  2. Interactive Fivetran API Reference
  3. Getting an API key
  4. Fivetran destination docs
  5. Fivetran Metadata API docs