Mailchimp Project Day 1

Today, our Data Engineering cohort started a four-day project to build a pipeline for Mailchimp data. The project will involve extracting, loading, and transforming data using a variety of tools, including python, AWS, Kestra, and dbt. The end goal is to enrich our data from the Amplitude API to help track user-engagement with The Information Lab's email campaigns.

I took a sprint approach and did a little bit of everything.

My blueprint for today

Extraction with Python

My plan was to use the campaign list endpoint to retrieve all campaigns in the past 180 days. Then, I used another endpoint to grab a list of recipients for each campaign. Finally, I retrieved information for all recipients, including email, company, and name.

Because the number of records I can return with one call is capped at 1,000, I had to loop through pagination to get all records. The final number of recipients I fetched was 17,470. Below is my function to handle pagination.

def get_campaign_recipients(list_id, extract_time):
    url = f'https://us2.api.mailchimp.com/3.0/lists/{list_id}/members'
    members = []
    offset = 0
    count = 1000  # max Mailchimp allows

    # Get total number of members
    response = requests.get(
        auth=("anystring", API_KEY),
        params={"count": 1, "offset": 0}
    )
    data = response.json()
    total_items = data.get('total_items', 0)

    print(f'Total members in list {list_id}: {total_items}')

    # Paginate properly
    while True:
        response = requests.get(
            url,
            auth=("anystring", API_KEY),
            params={"count": count, "offset": offset}
        )
        data = response.json()
        page_members = data.get('members', [])

        # If no more members returned, stop
        if not page_members:
            break

        # Process this page
        for d in page_members:
            members.append({
                'email_address': d.get('email_address'),
                'status': d.get('status'),
                'company': d.get('company', ''),
                'full_name': d.get('full_name', ''),
                'timestamp_signup': d.get('timestamp_signup'),
                'last_changed': d.get('last_changed'),
                'extract_time': extract_time
            })

        # Increase offset for next page
        offset += count

        # Stop if we’ve fetched all items
        if len(members) >= total_items:
            break

    df_members = pd.DataFrame(members)

    print(f'Retrieved {len(members)} of {total_items} members from list {list_id}')
    logger.info(f'Retrieved {len(members)} of {total_items} members from list {list_id}')

    return df_members, extract_time

Loading to S3

The next step was to load to S3. I had set up a bucket for Amplitude so I used that for expedience. I created a sub-directory within my Amplitude bucket for Mailchimp data.

Staging to Snowflake

Afterwards, I created a new stage in Snowflake to load in Mailchimp S3 data.

CREATE OR REPLACE STAGE mailchimp_python_stage
  STORAGE_INTEGRATION = cy_amplitude_python_import
  URL = 's3://cyi-amplitude-storage/python-import/mailchimp/'
  FILE_FORMAT = cyi_csv_format;

Now, I had records of all recipients of campaigns in the last 180 days.

Next Steps

  1. Grab campaign activity to go along with record of campaign recipients
  2. Finish setting up S3 bucket for Mailchimp data
  3. Set up dbt repository
Author:
Charles Yi
Powered by The Information Lab
1st Floor, 25 Watling Street, London, EC4M 9BR
Subscribe
to our Newsletter
Get the lastest news about The Data School and application tips
Subscribe now
© 2025 The Information Lab