Load data from an API
In this section, we will retrieve and load data from the GitHub API into DuckDB. Specifically, we will load issues from our dlt-hub/dlt repository. We picked DuckDB as our destination because it is a lightweight, in-process database that is easy to set up and use.
Before we start, make sure you have installed dlt
with the DuckDB dependency:
pip install "dlt[duckdb]"
Need help with this tutorial? Join our Slack community for quick support.
Create a pipeline
First, we need to create a pipeline. Pipelines are the main building blocks of dlt
and are used to load data from sources to destinations. Open your favorite text editor and create a file called github_issues.py
. Add the following code to it:
import dlt
from dlt.sources.helpers import requests
# Specify the URL of the API endpoint
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
# Make a request and check if it was successful
response = requests.get(url)
response.raise_for_status()
pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="duckdb",
dataset_name="github_data",
)
# The response contains a list of issues
load_info = pipeline.run(response.json(), table_name="issues")
print(load_info)
Here's what the code above does:
- It makes a request to the GitHub API endpoint and checks if the response is successful.
- Then it creates a dlt pipeline with the name
github_issues
and specifies that the data should be loaded to theduckdb
destination and thegithub_data
dataset. Nothing gets loaded yet. - Finally, it runs the pipeline with the data from the API response (
response.json()
) and specifies that the data should be loaded to theissues
table. Therun
method returns aLoadInfo
object that contains information about the loaded data.
Run the pipeline
Save github_issues.py
and run the following command:
python github_issues.py
Once the data has been loaded, you can inspect the created dataset using the Streamlit app:
dlt pipeline github_issues show
Append or replace your data
Try running the pipeline again with python github_issues.py
. You will notice that the issues table contains two copies of the same data. This happens because the default load mode is append
. It is very useful, for example, when you have daily data updates and you want to ingest them.
To get the latest data, we'd need to run the script again. But how to do that without duplicating the data?
One option is to tell dlt
to replace the data in existing tables in the destination by using replace
write disposition. Change the github_issues.py
script to the following:
import dlt
from dlt.sources.helpers import requests
# Specify the URL of the API endpoint
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
# Make a request and check if it was successful
response = requests.get(url)
response.raise_for_status()
pipeline = dlt.pipeline(
pipeline_name='github_issues',
destination='duckdb',
dataset_name='github_data',
)
# The response contains a list of issues
load_info = pipeline.run(
response.json(),
table_name="issues",
write_disposition="replace" # <-- Add this line
)
print(load_info)
Run this script twice to see that issues table still contains only one copy of the data.
What if the API has changed and new fields get added to the response?
dlt
will migrate your tables!
See the replace
mode and table schema migration in action in our Schema evolution colab demo.
Learn more:
Declare loading behavior
So far we have been passing the data to the run
method directly. This is a quick way to get started. However, frequently, you receive data in chunks, and you want to load it as it arrives. For example, you might want to load data from an API endpoint with pagination or a large file that does not fit in memory. In such cases, you can use Python generators as a data source.
You can pass a generator to the run
method directly or use the @dlt.resource
decorator to turn the generator into a dlt resource. The decorator allows you to specify the loading behavior and relevant resource parameters.
Load only new data (incremental loading)
Let's improve our GitHub API example and get only issues that were created since last load.
Instead of using replace
write disposition and downloading all issues each time the pipeline is run, we do the following:
import dlt
from dlt.sources.helpers import requests
@dlt.resource(table_name="issues", write_disposition="append")
def get_issues(
created_at=dlt.sources.incremental("created_at", initial_value="1970-01-01T00:00:00Z")
):
# NOTE: we read only open issues to minimize number of calls to the API.
# There's a limit of ~50 calls for not authenticated Github users.
url = (
"https://api.github.com/repos/dlt-hub/dlt/issues"
"?per_page=100&sort=created&directions=desc&state=open"
)
while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()
# Stop requesting pages if the last element was already
# older than initial value
# Note: incremental will skip those items anyway, we just
# do not want to use the api limits
if created_at.start_out_of_range:
break
# get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]
pipeline = dlt.pipeline(
pipeline_name="github_issues_incremental",
destination="duckdb",
dataset_name="github_data_append",
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
Let's take a closer look at the code above.
We use the @dlt.resource
decorator to declare the table name into which data will be loaded and specify the append
write disposition.
We request issues for dlt-hub/dlt repository ordered by created_at field (descending) and yield them page by page in get_issues
generator function.
We also use dlt.sources.incremental
to track created_at
field present in each issue to filter in the newly created.
Now run the script. It loads all the issues from our repo to duckdb
. Run it again, and you can see that no issues got added (if no issues were created in the meantime).
Now you can run this script on a daily schedule and each day you’ll load only issues created after the time of the previous pipeline run.
Between pipeline runs, dlt
keeps the state in the same database it loaded data to.
Peek into that state, the tables loaded and get other information with:
dlt pipeline -v github_issues_incremental info
Learn more:
- Declare your resources and group them in sources using Python decorators.
- Set up "last value" incremental loading.
- Inspect pipeline after loading.
dlt
command line interface.
Update and deduplicate your data
The script above finds new issues and adds them to the database.
It will ignore any updates to existing issue text, emoji reactions etc.
To get always fresh content of all the issues you combine incremental load with merge
write disposition,
like in the script below.
import dlt
from dlt.sources.helpers import requests
@dlt.resource(
table_name="issues",
write_disposition="merge",
primary_key="id",
)
def get_issues(
updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
# NOTE: we read only open issues to minimize number of calls to
# the API. There's a limit of ~50 calls for not authenticated
# Github users
url = (
"https://api.github.com/repos/dlt-hub/dlt/issues"
f"?since={updated_at.last_value}&per_page=100&sort=updated"
"&directions=desc&state=open"
)
while True:
response = requests.get(url)
response.raise_for_status()
yield response.json()
# Get next page
if "next" not in response.links:
break
url = response.links["next"]["url"]
pipeline = dlt.pipeline(
pipeline_name="github_issues_merge",
destination="duckdb",
dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
Above we add primary_key
argument to the dlt.resource()
that tells dlt
how to identify the issues in the database to find duplicates which content it will merge.
Note that we now track the updated_at
field — so we filter in all issues updated since the last pipeline run (which also includes those newly created).
Pay attention how we use since parameter from GitHub API
and updated_at.last_value
to tell GitHub to return issues updated only after the date we pass. updated_at.last_value
holds the last updated_at
value from the previous run.
Learn more about merge write disposition.
Using pagination helper
In the previous examples, we used the requests
library to make HTTP requests to the GitHub API and handled pagination manually. dlt
has the built-in REST client that simplifies API requests. We'll pick the paginate()
helper from it for the next example. The paginate
function takes a URL and optional parameters (quite similar to requests
) and returns a generator that yields pages of data.
Here's how the updated script looks:
import dlt
from dlt.sources.helpers.rest_client import paginate
@dlt.resource(
table_name="issues",
write_disposition="merge",
primary_key="id",
)
def get_issues(
updated_at=dlt.sources.incremental("updated_at", initial_value="1970-01-01T00:00:00Z")
):
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={
"since": updated_at.last_value,
"per_page": 100,
"sort": "updated",
"direction": "desc",
"state": "open",
},
):
yield page
pipeline = dlt.pipeline(
pipeline_name="github_issues_merge",
destination="duckdb",
dataset_name="github_data_merge",
)
load_info = pipeline.run(get_issues)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("------")
print(load_info)
Let's zoom in on the changes:
- The
while
loop that handled pagination is replaced with reading pages from thepaginate()
generator. paginate()
takes the URL of the API endpoint and optional parameters. In this case, we pass thesince
parameter to get only issues updated after the last pipeline run.- We're not explicitly setting up pagination,
paginate()
handles it for us. Magic! Under the hood,paginate()
analyzes the response and detects the pagination method used by the API. Read more about pagination in the REST client documentation.
Next steps
Continue your journey with the Resource Grouping and Secrets tutorial.
If you want to take full advantage of the dlt
library, then we strongly suggest that you build your sources out of existing building blocks:
- Pick your destinations.
- Check verified sources provided by us and community.
- Access your data with SQL or Pandas.
- Append, replace and merge your tables.
- Set up "last value" incremental loading.
- Set primary and merge keys, define the columns nullability and data types.
- Use built-in requests client.