Recently for a client, I had to build a data pipeline to extract, load, and transform data from The European Automobile Manufacturers’ Association, or ACEA. I eventually provided an excel deliverable to the end-user. I wanted to build a personal project based on what I did at work, but with a twist. For this project, I re-built the workflow by incorporating some of my recent obsessions: dbt and agentic AI. The result is a sample production-style workflow that combines deterministic data engineering with natural-language AI orchestration.
At a high level, the workflow runs like this:
- Ingest source data (documents/files)
- Parse and stage raw records
- Load into warehouse
- Transform with dbt into medallion-style layers
- Publish modeled output to BI
- Triage and validate using BI-native MCP tools
- Produce consistent visual outputs with a reusable AI visualization skill
Agentic AI acts as the glue between these various steps as data traverses the extract, load, and transform stages. I'll first show how I configured the agent, then walk through each stage to show how the data is transformed and how AI enhances the process.
Establishing AI Directives
In an agentic workflow, the AI agent is provided a series of commands, or directives, in a markdown .md file. Using natural language, I can provide a series of do's and don'ts to the AI to control its output in a systematic way.
Within my IDE, I created a directives/pdf_scrape.md file to establish my initial directives. Below is an excerpt showing how I began the markdown file:
# PDF scrape and verification
## Goal
Parse data from PDFs saved in a user-specified directory into intermediate csvs. Ask user to verify content.
As I developed the workflow and identified potential pitfalls, I continuously updated this file to guide the AI agent more effectively.
Extraction
The first major step in the workflow is to web-scrape PDFs from ACEA’s website. For context, ACEA publishes monthly PDF reports on car registrations across the EU. My goal was to download these PDFs and extract the relevant data.
Anyone who has attempted persistent web scraping knows the process is prone to breaking whenever the website changes. Add to that the complexity of parsing data from PDFs—which can vary in structure with each publication—and maintaining the extraction process becomes a real challenge.
This is where an agentic workflow shines. I allowed the AI to take full control of this step. In my directives, I instructed the agent to generate its own scraping and parsing code. To ensure persistence, I asked it to store all produced scripts in an execution folder. This folder is also where I stored code I personally created.
Additionally, I created a separate schema folder containing .csv files with sample schemas to guide the agent in generating scraping and parsing code. Finally, I instructed the agent to request user validation before moving on to the next step.
1. **Fetch all PDFs in directory**
- There is no execution .py file for this. Come up with the best way to grab data from the PDF and store the script in `execution` folder.
- Folder schema provided in `schema` folder for each directory. For example,
if you are pulling data from PDFs listed under the `PDFs/ACEA` folder, then use `schema/ACEA.csv`
- Make sure you are grabbing the region description correctly
- Exclude 'Others' from manufacturer field as it causes duplication issues
- Output intermediate results into the `data` folder
2. **Verification**
- Agent (you) read the csvs
- Wait for verification from user
In the case the scraping or parsing breaks, I ask the agent to self-anneal. Self-annealing is a term borrowed from metallurgy and describes the re-crystallization process of certain metals. When an AI agent self-anneals, it adjusts code based on error messages and stack traces. In theory, this allows the agent to handle future changes to the website or PDF format, except in cases where the site blocks access or the required data is entirely missing from the PDF.
Load
For the loading stage, I directed the agent to use code that I developed. The code (below) uploads the temporary .csv files generated from parsing the PDFs to Snowflake. After the upload is complete, it deletes the files from the directory to keep the development environment clean.
import pandas as pd
import datetime
import dotenv
import os
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from execution.logger import logger_setup
dotenv.load_dotenv()
logger = logger_setup()
def upload_to_snowflake():
snowflake_user = os.getenv("snowflake_user")
snowflake_password = os.getenv("snowflake_password")
snowflake_account = os.getenv("snowflake_account")
snowflake_schema = os.getenv("snowflake_schema")
snowflake_database = os.getenv("snowflake_database")
# fetch ACEA csv files in data folder only
data_dir = "data/"
csv_paths = []
for root, dires, files in os.walk(data_dir):
csv_files = [file for file in files if file.startswith('Press_release') and file.endswith('.csv')]
csv_paths.extend([os.path.join(root, file) for file in csv_files])
logger.info(f"Found {len(csv_paths)} ACEA Press_release CSV files for upload")
if not csv_paths:
raise ValueError("No ACEA Press_release CSV files found in data/ for upload")
df = pd.concat([pd.read_csv(path) for path in csv_paths], ignore_index=True)
# clean data: remove commas from Units column and convert to numeric
df['Units'] = df['Units'].astype(str).str.replace(',', '').astype(float)
# add a timestamp column to the dataframe
df['inserted_at'] = datetime.datetime.now().isoformat()
# snowflake connection
conn = snowflake.connector.connect(
user=snowflake_user,
password=snowflake_password,
account=snowflake_account,
schema=snowflake_schema,
database=snowflake_database
)
# writing data to table
write_pandas(conn,
df,
'ACEA_DATA',
schema=snowflake_schema,
database=snowflake_database,
quote_identifiers=False,
overwrite=True,
auto_create_table=True)
# delete uploaded ACEA files after upload
for path in csv_paths:
os.remove(path)
logger.info(f"Uploaded {len(df)} records to Snowflake and deleted local CSV files")
# close connection
conn.close()
if __name__ == "__main__":
upload_to_snowflake()
In the directive, I instruct the agent to use this code to produce a deterministic upload process.
3. **Uploading to Snowflake**
- Once verified, upload to Snowflake using `execution/upload_snowflake.py`
- All uploaded data files should be deleted
Transform
The transformation stage is handled in dbt Core following a medallion-style data architecture. In the bronze (staging) layer, I performed column renames for clarity and consistency. In the silver (intermediate) layer, I de-duplicated the data and enforced uniqueness checks, since PDFs can update figures for previously reported months. Finally, in the gold (marts) layer, I used a dbt Python model to generate a series of metrics, including trailing 12 months (TTM), year-to-date (YTD), period-over-period, and year-over-year calculations.
from logging import log
def model(dbt, session):
import pandas as pd
import os
# read data
df = dbt.ref('int_acea_data').to_pandas()
# changing unit col to numeric
df['DATE'] = pd.to_datetime(df['MONTH'], format='%b-%y') + pd.offsets.MonthEnd(0)
df.drop(columns=['MONTH'], inplace=True)
# removing YTD rows - will re-calculate
df_filtered = df[df['FREQUENCY'] != 'YTD']
# resampling
df_filtered.set_index('DATE', inplace=True)
df_filtered = df_filtered.groupby(['MANUFACTURER', 'FREQUENCY', 'REGION']).resample('M')['UNITS'].mean().ffill().reset_index()
# concating
to_ret = df_filtered.copy()
# calculating YTD values
df_ytd = to_ret.copy()
df_ytd['Year'] = df_ytd['DATE'].dt.year
df_ytd['YTD'] = df_ytd.groupby(['MANUFACTURER', 'FREQUENCY', 'REGION', 'Year'])['UNITS'].cumsum()
df_ytd = df_ytd.drop(columns=['Year'])
# calculating rolling sum
df_ttm = to_ret.copy()
df_ttm = df_ttm.sort_values(['MANUFACTURER', 'FREQUENCY', 'REGION', 'DATE']).reset_index(drop=True)
df_ttm['TTM'] = df_ttm.groupby(['MANUFACTURER', 'FREQUENCY', 'REGION'], sort=False)['UNITS'].transform(
lambda x: x.rolling(window=12, min_periods=1).sum()
)
# concating YTD and TTM cuts back to main df
to_ret['TTM'] = df_ttm['TTM']
to_ret['YTD'] = df_ytd['YTD']
# calculate PoP growth rate for each cut
measures = ['UNITS', 'YTD', 'TTM']
to_ret[[f'{m}_PoP' for m in measures]] = to_ret.groupby(['MANUFACTURER', 'FREQUENCY', 'REGION'])[measures].pct_change(fill_method=None)
# calculating yearly growth rate for q and m cuts
yoy = to_ret.groupby(
['MANUFACTURER', 'REGION', 'FREQUENCY'],
group_keys=False,
)[measures].apply(lambda g: g.pct_change(12 if g.name[-1] == 'M' else 4, fill_method=None))
to_ret[[f'{m}_YoY' for m in measures]] = yoy
# melting
df_melted = pd.melt(to_ret,
id_vars = ['MANUFACTURER', 'REGION', 'FREQUENCY', 'DATE'],
value_vars = ['UNITS', 'YTD', 'TTM', 'UNITS_PoP', 'YTD_PoP', 'TTM_PoP', 'UNITS_YoY', 'YTD_YoY', 'TTM_YoY'],
var_name = 'Measure',
value_name = 'Value')
return df_melted
Like the loading process, I asked the AI agent to use the code I created and leverage dbt MCP to build the models in Snowflake.
4. **Building dbt models**
- Use dbt mcp or codegen for all tasks associated with dbt
- Build all models in `dbt/`
- Use project venv dbt Core CLI (not `dbt-fusion`) when running Python models, since Fusion CLI may skip Python model execution in this project
- From repo root, run dbt with `./.venv/Scripts/dbt.exe` (Windows)
- Example commands:
- `cd dbt ; ../.venv/Scripts/dbt.exe build -s int_acea_metrics`
- `cd dbt ; ../.venv/Scripts/dbt.exe build -s stg_acea_data --full-refresh` (when full rebuild requested)
- Whenever a new model is created, add to appropriate .yml file (ex: `_int.yml`)
Data Delivery and Triage
With that, the ELT pipeline is complete—supported and supervised by a self-annealing AI agent. The next step is delivering the data to the end user.
For this final stage, I leveraged Tableau MCP and a data visualization skill from mcpmarket.com. An MCP connects an AI agent to an external resource (like Tableau Server), while a skill provides custom procedural instructions for performing specific tasks. I provided the agent with code to publish the gold-layer model as a Published Datasource in Tableau Server using Tableau Server Client and pantab, a Python wrapper for Tableau’s Hyper API.
import pandas as pd
import pantab
import dotenv
import os
import snowflake.connector
import tableauserverclient as TSC
from execution.logger import logger_setup
logger = logger_setup()
dotenv.load_dotenv()
snowflake_user = os.getenv("snowflake_user")
snowflake_password = os.getenv("snowflake_password")
snowflake_account = os.getenv("snowflake_account")
snowflake_schema = os.getenv("snowflake_schema")
snowflake_database = os.getenv("snowflake_database")
tableau_user = os.getenv("tableau_user")
tableau_password = os.getenv("tableau_password")
tableau_server = os.getenv("tableau_server")
tableau_site = os.getenv("tableau_site")
def create_hyper(snowflake_table="MARTS_ACEA_METRICS", hyper_path="hyper_files/marts_acea.hyper"):
"""
Creating hyper file for Tableau datasource upload.
:param snowflake_table: name of the table in Snowflake to read data from
:param hyper_path: path to save the generated hyper file
"""
try:
conn = snowflake.connector.connect(
user=snowflake_user,
password=snowflake_password,
account=snowflake_account,
schema=snowflake_schema,
database=snowflake_database
)
mart_acea_df = pd.read_sql(f'SELECT * FROM {snowflake_table}', conn)
logger.info(f"Data read from Snowflake: {mart_acea_df.shape[0]} rows, {mart_acea_df.shape[1]} columns")
pantab.frame_to_hyper(mart_acea_df, hyper_path, table="marts_acea_metrics")
logger.info(f"Hyper file created at: {hyper_path}")
conn.close()
except Exception as e:
logger.error(f"Error creating hyper file: {e}")
raise
def upload_to_tableau(project_name):
"""
Upload hyper file to Tableau Server.
:param project_name: name of the Tableau project to publish the datasource to
"""
try:
tableau_auth = TSC.TableauAuth(tableau_user, tableau_password, site_id=tableau_site)
server = TSC.Server(tableau_server, use_server_version=True)
with server.auth.sign_in(tableau_auth):
# need to specify the project where datasource will be published
req_option = TSC.RequestOptions()
req_option.filter.add(TSC.Filter(TSC.RequestOptions.Field.Name,
TSC.RequestOptions.Operator.Equals,
project_name))
project_item, pagination = server.projects.get(req_option)
logger.info(f"Project retrieved: {project_name}")
for p in project_item:
project_id = p.id
new_datasource = TSC.DatasourceItem(project_id)
server.datasources.publish(
new_datasource,
file="hyper_files/marts_acea.hyper",
mode='Overwrite'
)
logger.info(f"Datasource published to project: {project_name}")
except Exception as e:
logger.error(f"Error uploading to Tableau: {e}")
raise
if __name__ == "__main__":
try:
create_hyper()
upload_to_tableau('Charles')
except Exception as e:
exit(1)
Once the data is available in Tableau Server, I instruct the AI to use the Tableau MCP to respond to user queries via a chatbot. The agent leverages Tableau’s VizQL Data Service (VDS) to query the published data source and download the results as .csv files. It then uses the previously defined skill to generate consistently-themed visualizations. If desired, the user, or I, can also connect directly to Tableau to build more complex dashboards. Of course, even dashboard creation could be fully automated by an AI agent.
With this, we have a complete, end-to-end ELT workflow that incorporates data extraction, transformation, triaging, and delivery, all orchestrated and supervised by a self-annealing AI agent.
6. **Data triage**
- Use Tableau MCP Vizql Data Service (VDS) tool to triage published Tableau data sources
- **6a. Query + export dataset**
- Run Tableau MCP query (`mcp_tableau_query-datasource`) for the user’s triage question
- Export and save the returned rows as csv in the run folder
- **6b. Plot artifacts**
- Use the exported csv to generate plots using `skills/data-viz-plots`
- Save generated plots in the same run folder
- Always save triage artifacts under `analyses/` (never in temp directories)
- At minimum, save:
- query result dataset (csv)
- generated plots (png)
- short run metadata (txt or json) with datasource, filters, and timestamp
7. **Triage run output structure (required)**
- Each triage request must be saved into its own folder under `analyses/`
- Folder naming convention:
- `analyses/<YYYY-MM-DD>/<title>/`
- Date must not include timestamp
- Title must be the agent's best short summary of the request
- Do not overwrite previous runs; if the folder already exists, create a new one with a numeric suffix (example: `<title>_2`)
- Keep outputs for each run together (csv, png, metadata) in that run folder
Conclusion
I really enjoyed building this workflow and learned a lot about how AI can be integrated to create a robust, self-annealing, and deterministic data pipeline. Seeing the potential of a workflow like this has been exciting, and I’m eager to continue exploring the possibilities in AI-driven data engineering!
Check out the Github repo: https://github.com/sknman92/dbt-Tableau-Agentic-Workflow-w.-European-Car-Registration-Data
