Build Your Own LinkedIn Analytics Part 4: Ingesting Data

Build Your Own LinkedIn Analytics Part 4: Ingesting Data

Published on Medium on 27 October 2025

In the previous article, we set up a workspace on Databricks Free Edition and prepared the infrastructure for ingesting the data. In this article, we finally start the ingestion process.

This is part of the following blog series:

TL;DR

  • We move from architecture to execution by ingesting LinkedIn Excel exports into Databricks Free Edition.
  • The process handles historical and daily data loads, validates files, and stores clean outputs in bronze tables for downstream use.
  • Along the way, we build reliability with error handling, schema checks, and staging folders, laying the foundation for data cleaning and transformation in the next part of the article series.

I. Reading the Excel files

In the 2nd article on choosing the data source, we identified the data that we would like to ingest from our chosen Excel files. A summary is in the tables below.

Data to ingest from historical content analytics.
Data to ingest from daily content analytics.
Data to ingest from post analytics.

a. Major caveats

There are two major caveats with the data as-is:

  1. The breakdown on post level for impressions and engagements is only for the top 50 posts, so there is a risk that the statistics of some posts may not be captured if they fall outside of the top 50 for the time range. As discussed in the first article, this is not a big deal for me since my thought leadership push is only a few months old and I do not yet expect to see impressions and/or engagement on more than 50 posts in a single day, but it does need to be confirmed and monitored.
  2. The Post URL is inconsistent for the Post Analytics file versus the Content Analytics file. This is a more serious matter since we are seeking a unique identifier for our LinkedIn posts, and Post URL would have been the perfect candidate if not for this inconsistency.

We will address these caveats later in this article. The priority now is to ingest the as-is data into the bronze layer.

II. Data Architecture: The Bronze Layer

Recall the purpose of a bronze layer from our data architecture article:

The bronze layer is where data from outside the data warehouse is supposed to land, with minimal or no cleaning and transformations. The idea is that this layer is a source of truth for external data, and any potential reingestion of raw data or any audits can start from this layer.

In other words, what we are ingesting shoud match our source data 1:1 as far as possible.

With that in mind, here’s our proposed data flow:

Proposed data flow from landing to bronze layer.

Let’s approach this on a sheet-by-sheet level:

a. Ingestion approach: ENGAGEMENT sheet in Content Analytics Excel file

The table from the ENGAGEMENT sheet can be ingested as-is from the historical and daily Content Analytics Excel files into a new totals table in the bronze linkedin schema. The strategy here would be to ingest the table from the historical data first, then do incremental updates afterwards using the daily data.

Schema for bronze.linkedin.totals table.

b. Ingestion approach: TOP POSTS sheet in Content Analytics Excel file

There are two tables in the TOP POSTS sheet of the daily Content Analytics Excel file, one each for impressions and engagements. We will ingest this into two seperate tables, but now the question is: how to ingest the multitude of daily files?

  1. One approach would be to use an incremental approach to update two master tables (e.g. impressions and engagements), similar to that used for the totals table. I decided against this because I did not want to entertain the risk of a bad ingest corrupting an entire table. This risk is more tolerable for the totals table because only two metrics (impressions and engagements) are being ingested on a daily basis, as opposed to the daily list of Post URLs and associated Post Dates.
  2. Another approach, and the one that I am taking, is to ingest each file into a seperate, date-suffixed table (e.g. impressions_{YYYY}_{MM}_{DD} and engagements_{YYYY}_{MM}_{DD}). This ensures that any data corruption for any file ingestion is limited to that one table (fault isolation), and also allows me to keep track of which dates have already been ingested (audit). Incidentally, date-suffixed tables are a hallmark of Google Analytics daily exports on BigQuery.

In the real world, we could keep these tables around for the purposes of being able to reingest where needed, while also sticking to organizational policies regarding date retention and hygeine. But now we run into a limitation of Databricks Free Edition:

There can only be 100 tables or less per schema in Databricks Free Edition.

To address this limitation, we can modify our approach such that our time-suffixed tables are now treated as staging tables. In other words, these are working tables that will be purged as soon as their underlying data is written into the corresponding master silver tables. We will still be limited to ingesting not more than 100 days of data at a time, but a) such a scenario should be fairly rare and b) we can ingest in batches of 100 days, as long as our pipeline supports it (which it should, as we will see).

Schema for bronze.linkedin.engagements_{YYYY}_{MM}_{DD} staging tables. Note that hyphens cannot be part of the name of a Databricks artifact (e.g. table).
Schema for bronze.linkedin.impressions_{YYYY}_{MM}_{DD} staging tables. Note that hyphens cannot be part of the name of a Databricks artifact (e.g. table).

Staging tables are a common feature in real-world pipelines, and they are typically used as follows:

  1. For storing the intermediate results of complex transformations, so that the full transformation will take up less time and compute resources. You see this most often when data is being tranformed into silver or gold layer tables.
  2. For storing the results of single file ingestions (like what we are doing) or single API calls, so as to persist the structured intermediate results for subsequent processing and not have to repeat ingesting the unstructured data multiple times. In the case of API calls, this is especially important for preserving the call results at a fixed point in time, since the results of API calls can be inconsistant or change over time.

We are also extracting the data for Post URL and Post Date into a seperate posts table (deduplicating for each pairing of Post URL/Post Date) that will be enriched later. In this instance, we are using the staging tables rather than the original Excel files as our source.

Initial schema for bronze.linkedin.posts table.

c. Ingestion approach: FOLLOWERS sheet in Content Analytics Excel file

The table from the FOLLOWERS sheet can be ingested as-is from the historical and daily Content Analytics Excel files into a new followers table in the bronze linkedin schema. There is also the Total Followers figure, which can be ingested as an additional field matched to the end date of the date range that the file is associated with; this is, after all, the figure as it stands on that date.

Schema for bronze.linkedin.followers table.

d. Ingestion approach: PERFORMANCE sheet in Post Analytics Excel file

The data in the PERFORMANCE sheet of the Post Anlaytics file for each post is in transposed form; in other words, the names of the fields are listed down the side like a form rather than along the top. For our purposes, however, only the Post URL and Post Publish Time are of interest, so we will ingest that data into a seperate posts_details table. 

Initial schema for bronze.linkedin.post_details table.

III. Writing to the bronze layer

We are now ready to write our code for ingesting our landing zone files. But before proceeding, we need to make sure that we have some sample files in place in the respective landing volumes.

a. Uploading the Excel files

There are two ways to get to the upload dialog box. The first way is to click the ‘Upload to this volume’ button on the top right when you are in the volume.

Example screencap from the landing.linkedin.content_daily volume.

The second way is to click ‘+ New’ at the top left, then choose ‘Add or upload data’.

After that, choose ‘Upload files to a volume’.

Whichever way you choose, you end up with the dialog box below. Add the files to be uploaded here, make sure that the destination path is correct, then click the ‘Upload’ button.

Example screencap of the ‘Upload files to volume’ dialog box.

To recap, these are the types of files to be exported from LinkedIn and then uploaded to the respective landing volumes:

Summary table of available Excel files to be exported.

Start with a sampling of files (e.g. 1 content historical file, 3 content daily files, 3 post files) for the purpose of development. We can ingest a full load of files once we have set up the whole pipeline. 

The Databricks UI allows file sizes of up to 5 GB. This is plenty for our use case. If you’re interested in a programmatic way of uploading files, see for example the Python SDK example provided by Databricks.

b. Creating our first notebook

It’s almost time to finally start coding, but there’s one last thing to do, and that is to create a Python notebook. There are many ways of creating pipelines on Databricks, and notebooks are just one of them. But notebooks are a great way to quickly prototype a proof-of-concept, and it’s straightforward to orchestrate them in Databricks as part of a production pipeline. They are thus ideal for aspiring data professionals.

These days you also have access to Databricks Assistant, which is the native equivalent of GenAI chatbot assistants such as Github Copilot or Microsoft Copilot. Databricks Assistant can be a great way to get started on blank notebooks (e.g. to produce boilerplate starter code or documentation) as well as provide targeted debugging suggestions, but be careful in using it to produce a whole production-ready pipeline. We will return to this subject towards the end of the series.

Navigate to the Workspace section of the Databricks instance, and click the ‘Create’ button on the top right. You’re presented with the below options:

Screencap of ‘Create’ dropdown in the Workspace section.

Choose ‘Notebook’ and create a notebook. If you’re familiar with Git and have a Git repository ready (e.g. on Github or Bitbucket), you can instead choose ‘Git folder’ and then create the notebook in there. We will return to the topic of code versioning and DevOps processes towards the end of this blog series.

Note also the sheer variety of artifacts that you can create in the workspace, including something called an ‘ETL Pipeline’. This is Databricks’ version of a low-code pipeline builder. We won’t be considering it for this project, as the skills used building in Python and SQL are far more transferable, but it can be a good way to have a less technical stakeholder self-service a pipeline that can potentially be converted to production later on as needed.

Now that you’ve created the notebook, click on its autogenerated name and rename it linkedin_pipeline_poc, to signifiy that this is a proof-of-concept (POC).

Rename the notebook by clicking on its title as illustrated.

Starting with a proof of concept is an example of iterative development, where we start small and expand on our code sucessively from there. We will see how it subsequently plays out.

c. Ingesting the historical content analytics data

We are starting with this ingest because it is a one-time ingest of historical data, and based on our data source evaluation in part two of the blog series it should be a straightfoward ingestion that will serve as a base for tackling the other ingestions. To recap, this is where we will be pulling our data from:

Our source of historical analytics data
1. Initial exploration

Now in the POC notebook, write and run the following code:

%pip install openpyxl

The notebook will automatically connect to a serverless instance and install the above Python library for ingesting Excel files. For the purposes of our POC development, this is good enough.

Next, create a new code cell in the notebook and write the following code:

import pandas as pd

totals_file_path = "<insert name of the path>"

totals_df = pd.read_excel(
  totals_file_path, 
  sheet_name="ENGAGEMENT", 
  parse_dates=["Date"]
).dropna().iloc[:-1]
totals_df.columns = totals_df.columns.str.lower().str.replace(' ', '_')

spark.createDataFrame(
  totals_df
).write.format("delta").saveAsTable(
  "bronze.linkedin.totals"
)

We can extract the file path by navigating the catalog (third icon down on the left side of the notebook) to the file sitting in the content_historical volume, clicking the menu icon, and then choosing “Copy volume file path”, as illustrated below.

Let’s walk through the code:

  1. pandas is a popular Python library for ingesting small-to-medium-sized datasets in memory, and has a robust way of handling ingestion of Excel sheets. This ingestion method depends on openpyxl, which is why we installed the library earlier (it doesn’t come preinstalled in Databricks).
  2. We use the read_excel method in pandas to ingest the sheet into a pandas DataFrame, specifying the column that we want to process into a date format. See https://pandas.pydata.org/docs/reference/api/pandas.read_excel.html for a full documentation, including examples.
  3. A Spark context is created by default in Databricks. We make use of it to first convert the pandas DataFrame into a pyspark DataFrame, then write into a new bronze.linkedin.totals table in the Delta Lake format (which is the native format for Databricks tables).
  4. Prior to converting to a pyspark DataFrame, we also normalize the column names of the pandas DataFrame by setting it to lowercase and, importantly, replacing spaces with underscores. pyspark DataFrames, unlike pandas DataFrames, do not allow spaces in their column names, and to try otherwise will result in a runtime error. This is just one of the many ways in which pyspark DataFrames have stricter conditions than pandas DataFrames.
2. Prepping for production

We can run the code as is, but there are a few issues that we need to address if the code is to be readier for production:

  1. Our catalogs, tables and other references are hard-coded; this will make the code less maintainable in the long run because we will need to modify every hard-coded reference if there is a change required.
  2. We did not address what happens after we read our historical file in the pending folder; we want to move it to the processed folder when we’re done, or to the errors folder if there are errors.
  3. We hard-coded the filename for the historical file; in production we wouln’t know the name of the file beforehand unless it is passed to the notebook as a parameter. (Spoiler: Databricks can only pass on folder paths as a trigger, rather than the full path of new files. We will come back to this later in this blog series.)

Let’s address these issues. First, we extract the path to the pending folder as illustrated below (similar to how we did it for the historical Excel file):

You would have extracted the following path:

/Volumes/landing/linkedin/content_historical/pending/

Now let’s modify our code as follows:

import pandas as pd

# 1. define our constants
LANDING_CATALOG = "landing"
LANDING_SCHEMA = "linkedin"
LANDING_VOLUME = "content_historical"

PENDING_FOLDER = "pending"
PROCESSED_FOLDER = "processed"
ERRORS_FOLDER = "errors"

BRONZE_CATALOG = "bronze"
BRONZE_SCHEMA = "linkedin"
BRONZE_TOTALS_TABLE = "totals"

# 2. set our input and output variables
source_volume = \
  f"/Volumes/{LANDING_CATALOG}/{LANDING_SCHEMA}/{LANDING_VOLUME}/"

landing_pending_folder = f"{source_volume}{PENDING_FOLDER}/"
landing_processed_folder = f"{source_volume}{PROCESSED_FOLDER}/"
landing_errors_folder = f"{source_volume}{ERRORS_FOLDER}/"

bronze_totals_table = \
  f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_TOTALS_TABLE}"

# 3. execute the ingestion

# extract the list of files from the pending folder
totals_files = [f.path for f in dbutils.fs.ls(landing_pending_folder)]

# tackle every file in the pending folder
for file_path in totals_files:

  # extract filename from file path
  filename = file_path.split('/')[-1]

  try:
    totals_df = pd.read_excel(
      file_path, 
      sheet_name="ENGAGEMENT", 
      parse_dates=["Date"]
    ).dropna().iloc[:-1]
    totals_df.columns = \
      totals_df.columns.str.lower().str.replace(' ', '_') 
    
    spark.createDataFrame(
      totals_df
    ).write.format("delta").saveAsTable(
      bronze_totals_table
    )

    dbutils.fs.mv(file_path, landing_processed_folder + filename)
  except Exception as e:
    print(e)
    dbutils.fs.mv(file_path, landing_errors_folder + filename)

We have done the following:

  1. Set our catalogs etc. as constants (represented by all-capital variable names, as is standard naming practice), and configured our variables accordingly.
  2. Read all files programmatically from the pending folder using the file system functions in dbutils (the built-in library that Databricks provides for interacting with the Databricks environment). See https://docs.databricks.com/aws/en/dev-tools/databricks-utils for a full reference as to what dbutils can do.
  3. Files to be moved to either processed or errors folder depending on whether the ingestion is successful.
3. Making it resilient

But wait, do we actually want to read all files? What if the wrong file was uploaded? And how do we handle writing multiple files to the same table, or if the table already exists?

Let’s modify the code as follows:

# ... Import extra libraries
import re
from delta.tables import DeltaTable

# ... Define an extra constant
LINKEDIN_PROFILE_NAME = # use your own profile name here

# ... no other change in code up to step 3 of the code block ...

ingestion_timestamp = pd.Timestamp.utcnow()

# extract the list of files from the pending folder
historical_files_info = [
    (f.path, pd.to_datetime(f.modificationTime, unit='ms', utc=True)) 
    for f in dbutils.fs.ls(landing_pending_folder)
]

for file_path, file_timestamp in historical_files_info:
  # extract filename from file path
  filename = file_path.split('/')[-1]
  
  # define source and target paths for file
  pending_path = landing_pending_folder + filename
  processed_path = landing_processed_folder + filename
  errors_path = landing_errors_folder + filename

  # check if filename is of expected format
  if not re.search(
    r'Content_\d{4}-\d{2}-\d{2}_\d{4}-\d{2}-\d{2}_' 
    + LINKEDIN_PROFILE_NAME + r'\.xlsx', filename
  ):
    # move invalide filename to errors folder
    try:
      print(f"Invalid filename: Moving {file_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
    except Exception as e:
      print(f"Failed to move file {pending_path}: {e}")
  else:
    # process valid filename
    try:
      print(f"Processing ENGAGEMENT sheet in {pending_path}")
      totals_df = pd.read_excel(
        pending_path, 
        sheet_name="ENGAGEMENT", 
        parse_dates=["Date"]
      ).dropna().iloc[:-1] 
      totals_df.columns = \
        totals_df.columns.str.lower().str.replace(' ', '_') 
      totals_df['ingestion_timestamp'] = ingestion_timestamp
      totals_df['source_file'] = filename
      totals_df['source_file_timestamp'] = file_timestamp
      
      # Write totals to Delta table with upsert logic
      print(f"Writing to {bronze_totals_table}")
      if not totals_df.empty:
        if spark.catalog.tableExists(bronze_totals_table):
          delta_table = DeltaTable.forName(spark, bronze_totals_table)
          delta_table.alias("t").merge(
            spark.createDataFrame(totals_df).alias("s"),
            "t.date = s.date"
          ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
          spark.createDataFrame(
            totals_df
          ).write.format("delta").saveAsTable(
            bronze_totals_table
          )
      
      print(f"Processed: Moving {pending_path} to {processed_path}")
      dbutils.fs.mv(pending_path, processed_path)
    except Exception as e:
      print(e)
      print(f"Errors encountered: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)

The following modifications have been made:

  1. We have defined resuable variables for the paths to be used for moving to processed or errors folders.
  2. We have defined the expected filename format of our file using a regular expression, and created the logic to move the file to the errors folder if the filename is invalid.
  3. We have added logic to skip writing to the totals table if we extract no data from the file.
  4. We have added incremental upsert logic to our ingestion to update the totals table based on the Date field as and where necessary.
  5. We have added print statements to log what is happening at every stage, this is essential for debugging while developing the code as well as for any kind of future troubleshooting.
  6. We have added metadata relating to file ingestion (i.e. ingestion time, source filename and source file modification timestamp) to our bronze table schema. This schema wasn’t present in our initial design, but I’ve decided to include the metadata so that we can better keep track of where the source data comes from.

Now we can run this code, and at the end we should see the creation of the bronze.linkedin.totals table, as well as the moving of the Excel historical file from the pending to the processed folder.

Snapshot of the newly ingested bronze.linkedin.totals table.
Historical file is now in processed folder.

As an exercise, modify the code to also process the followers data from the FOLLOWERS sheet into the bronze.linkedin.followers table. Since the table in the FOLLOWERS sheet starts at row 3 rather than row 1, the read_excel function will require an extra parameter skiprows=2; otherwise, the code should be very similar.

d. Ingesting the daily content analytics data

We can adapt our approach to ingesting historical content when it comes to ingesting daily content. In fact, we can clone the historical data code with a few initial modifications, since this daily data also needs to write to the bronze totals table. These are the initial modifications:

LANDING_HISTORICAL_VOLUME = "content_historical" 
#  => (change to) 
LANDING_DAILY_VOLUME = "content_daily"

################################################

source_volume = \
  f"/Volumes/{LANDING_CATALOG}/{LANDING_SCHEMA}/{LANDING_VOLUME}/"
#  =>
source_volume = \
  f"/Volumes/{LANDING_CATALOG}/{LANDING_SCHEMA}/{LANDING_DAILY_VOLUME}/"

################################################
## New additions

BRONZE_IMPRESSIONS_TABLE = "impressions"
BRONZE_ENGAGEMENTS_TABLE = "engagements"

bronze_impressions_table_prefix = \
    f'{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_IMPRESSIONS_TABLE}'
bronze_engagements_table_prefix = \
    f'{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_ENGAGEMENTS_TABLE}'

# get current count of tables in bronze schema
table_count_in_bronze_schema = spark.sql(
    f"SHOW TABLES IN {BRONZE_CATALOG}.{BRONZE_SCHEMA}"
).count()
dates_processed = []

################################################

historical_files_info
#  =>
daily_files_info

################################################

re.search(
    r'Content_\d{4}-\d{2}-\d{2}_\d{4}-\d{2}-\d{2}_' 
    + LINKEDIN_PROFILE_NAME + r'\.xlsx', filename
)
#  =>
re.search(
    r'Content_(\d{4}-\d{2}-\d{2})_\1_' # both dates need to match
    + LINKEDIN_PROFILE_NAME + r'\.xlsx', filename
)

################################################
# dropping the last value for daily content means dropping the only value
# we want to avoid that

..._df = pd.read_excel(...).dropna().iloc[:-1]
#  =>
..._df = pd.read_excel(...).dropna() 

Now we add the code for ingesting the TOP POSTS sheet into engagements and impressions tables:

# ... add this code after the processing of the 
# ENGAGEMENTS and FOLLOWERS sheet ...

      # extract date of analytics and append it to staging table names
      analytics_date_str = re.search(
        r'Content_(\d{4}-\d{2}-\d{2})_\1_', filename
      ).group(1)
      analytics_date = pd.to_datetime(analytics_date_str)
      bronze_engagements_table = \
        f"{bronze_engagements_table_prefix}_{
          analytics_date_str.replace('-', '_')
        }"
      bronze_impressions_table = \
        f"{bronze_impressions_table_prefix}_{
          analytics_date_str.replace('-', '_')
        }"    

      tables_to_create = 0
      # check if bronze_engagement_table exists in bronze schema
      if not spark.catalog.tableExists(bronze_engagements_table):
        tables_to_create += 1
  
      # check if bronze_impressions_table exists in bronze schema
      if not spark.catalog.tableExists(bronze_impressions_table):
        tables_to_create += 1
  
      # exit if too many tables in bronze schema 
      # (specific to Databricks Free Edition)
      if table_count_in_bronze_schema + tables_to_create > 100:
        print("Too many tables in bronze schema. \
          Please process and clear staging tables before rerunning.")
        break

      # read top posts from xlsx file  
      print(f"Processing TOP POSTS sheet in {pending_path}")
      topposts_df = pd.read_excel(
        pending_path, sheet_name="TOP POSTS", skiprows=2
      )

      engagements_df = topposts_df.iloc[:, :3].dropna()
      engagements_df.columns = \
        engagements_df.columns.str.replace(' ', '_').str.lower()
      engagements_df['analytics_date'] = analytics_date
      engagements_df['ingestion_timestamp'] = ingestion_timestamp
      engagements_df['source_file'] = filename
      engagements_df['source_file_timestamp'] = file_timestamp

      impressions_df = topposts_df.iloc[:, 4:].dropna()
      impressions_df.columns = \
        impressions_df.columns.str.replace(
          '.1', '', regex=False
        ).str.replace(' ', '_').str.lower()
      impressions_df['analytics_date'] = analytics_date
      impressions_df['ingestion_timestamp'] = ingestion_timestamp
      impressions_df['source_file'] = filename
      impressions_df['source_file_timestamp'] = file_timestamp

      # Write engagements to Delta staging table, overwrite existing
      print(f"Writing to {bronze_engagements_table}")
      if not engagements_df.empty:
        spark.createDataFrame(
          engagement_df
        ).write.mode("overwrite").saveAsTable(
          bronze_engagement_table
        )

      # Write impressions to Delta staging table, overwrite existing
      print(f"Writing to {bronze_impressions_table}")
      if not impressions_df.empty:
        spark.createDataFrame(
          impressions_df
        ).write.mode("overwrite").saveAsTable(
          bronze_impressions_table
        )

      # update counters for dates processed and tables in bronze schema
      dates_processed.append(analytics_date_str)  
      table_count_in_bronze_schema += tables_to_create

# ... add at the end of the cell

if len(dates_processed) == 0:
    print("No dates processed.")
else:
    print(f"Dates processed: {dates_processed}")

Here’s what’s happening in this code snippet:

  1. We are reading TOP POSTS in as a single DataFrame to being with (skipping the first two rows like we did with FOLLOWERS), then splitting into two seperate DataFrames for engagements and impressions.
  2. Technically the column names in the impressions DataFrame (post_url and post_publish_date) are repeated from the column names in the engagements DataFrame. pandas automatically appends a ‘.1’ to the column names in such cases, so we will need to remove that suffix in the column names for the impressions DataFrame.
  3. We are ovewriting our timestamped staging tables; each staging tables represents an ingest from a single file, so it makes sense for them to be fully replaced if they already exist.
  4. Remember that we can only have a maximun of 100 tables in a schema for Databricks Free Edition. To mitigate that, we get the current count of tables in the bronze schema and keep track of how many staging tables we are creating along the way. If we are about to exceed 100 tables, we log the issue and exit the file processing loop.
  5. We track the dates of the files we process for debugging purposes.

Assuming we have 3 test files to ingest and assuming that TOP POSTS has data for both engagements and impressions in all 3 files, the bronze catalog should now look something like this:

Example screencap of bronze catalog with staging tables.

We still need to ingest the post timestamp and URL into a seperate posts table, as discussed earlier. Now that we have the staging tables, we can use them as a source in a subsequent cell. The code is as follows:

# Extract posts data using post_url from all impressions tables 
# and save to bronze posts table

from pyspark.sql.functions import col, row_number, desc
from pyspark.sql.window import Window
from delta.tables import DeltaTable

BRONZE_POSTS_TABLE = "posts"

bronze_posts_table = \
  f'{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_POSTS_TABLE}'

# List all impressions staging tables in the bronze.linkedin schema
daily_impressions_tables = \
  [table.name for table in spark.catalog.listTables(
    f'{BRONZE_CATALOG}.{BRONZE_SCHEMA}'
   ) if table.name.startswith(BRONZE_IMPRESSIONS_TABLE)]

print("Loading daily impressions tables to extract posts data...")
# Read each table and select the relevant columns
columns_to_select = [
  "post_url", "post_publish_date", 
  "ingestion_timestamp", "source_file", "source_file_timestamp"
]
dfs = [
  spark.read.table(
    f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{table}"
  ).select(*columns_to_select) for table in daily_impressions_tables
]

# Union all dataframes and remove duplicates
if dfs:
  impressions_df = dfs[0]
  for df in dfs[1:]:
      impressions_df = impressions_df.unionByName(df)
  impressions_df = impressions_df.withColumn(
    "row_num",
    row_number().over(
      Window.partitionBy(
        "post_url", "post_publish_date"
      ).orderBy(desc("ingestion_timestamp"))
    )
  ).filter(col("row_num") == 1).drop("row_num")
  impressions_pd = \
    impressions_df.toPandas()[columns_to_select]
  impressions_pd['post_publish_date'] = \
    pd.to_datetime(impressions_pd['post_publish_date']).date()
else:
  impressions_pd = pd.DataFrame(columns=columns_to_select)

# Fetch existing post URLs from the bronze.linkedin.posts table
if spark.catalog.tableExists(bronze_posts_table):
  existing_posts_df = \
    spark.read.table(bronze_posts_table).select("post_url").toPandas()
  existing_post_urls = existing_posts_df['post_url'].tolist()
else:
  existing_post_urls = []

# Filter out post URLs that are already in the existing table
impressions_pd = \
  impressions_pd[~impressions_pd['post_url'].isin(existing_post_urls)]

if not impressions_pd.empty:

  # Save the dataframe to the bronze.linkedin.posts table
  impressions_spark_df = spark.createDataFrame(impressions_pd)
  if not spark.catalog.tableExists(bronze_posts_table):
    print(f"Creating {bronze_posts_table} table...")
    impressions_spark_df.write.format("delta").saveAsTable(
      bronze_posts_table
    )
  else:
    print(f"Appending new posts to {bronze_posts_table} table...")
    impressions_spark_df.write.format("delta").mode("append").saveAsTable(
      bronze_posts_table
    )
else:
  print("No new posts found based on impressions tables.")

At a high level, this is what we are doing:

  1. Extracting the list of impression staging tables directly from the bronze schema. We’re doing this so that the two parts of the ingesting process are independant of each other, so each cell can be run without worrying about hidden dependancies.
  2. Extracting distinct post URL and post publish date metadata based on the latest ingestion time. This is a common use case for window functions; if you need a refresher, check out Databricks’s SQL-based reference.
  3. Appending only new records to the posts table using post URL as the primary key. In this instance we are only interested in post URL and post publish date, which are both static values per post.

e. Ingesting the post analytics data

Recall that we only want Post URL and Post timestamp from the post analytics data. Even so, I’d like to pull in the other analytic information for potential future use. For reference, this is what the sample sheets look like:

Screencap for PERFORMANCE sheet.
Screencap for TOP DEMOGRAPHICS sheet

The list of metrics in PERFORMANCE is highly variable, hence it makes sense to pull it in as a dictionary of key-values.

The code is as follows:

import pandas as pd
import re
import datetime
from pyspark.sql import Row

# 1. define our constants
LINKEDIN_PROFILE_NAME = # use your own profile name here

LANDING_CATALOG = "landing"
LANDING_SCHEMA = "linkedin"
LANDING_POSTS_VOLUME = "posts"

PENDING_FOLDER = "pending"
PROCESSED_FOLDER = "processed"
ERRORS_FOLDER = "errors"

BRONZE_CATALOG = "bronze"
BRONZE_SCHEMA = "linkedin"
BRONZE_POST_DETAILS_TABLE = "post_details"

# 2. set our input and output variables
source_volume = \
  f"/Volumes/{LANDING_CATALOG}/{LANDING_SCHEMA}/{LANDING_POSTS_VOLUME}/"

landing_pending_folder = f"{source_volume}{PENDING_FOLDER}/"
landing_processed_folder = f"{source_volume}{PROCESSED_FOLDER}/"
landing_errors_folder = f"{source_volume}{ERRORS_FOLDER}/"

bronze_post_details_table = \
  f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_POST_DETAILS_TABLE}"

# 3. execute the ingestion
ingestion_timestamp = datetime.datetime.utcnow()

# extract the list of files from the pending folder
post_files_info = [
    (
      f.path, 
      pd.to_datetime(
        f.modificationTime, unit='ms', utc=True
      ).to_pydatetime()
    ) for f in dbutils.fs.ls(landing_pending_folder)
]

for file_path, file_timestamp in post_files_info:
 
  # extract filename from file path
  filename = file_path.split('/')[-1]
  
  # define source and target paths for file
  pending_path = landing_pending_folder + filename
  processed_path = landing_processed_folder + filename
  errors_path = landing_errors_folder + filename

  # check if filename is of expected format
  if re.search(
    r'PostAnalytics_' + LINKEDIN_PROFILE_NAME + r'_\d+(?: \(\d+\))?\.xlsx',
    filename
  ):
      
    # process valid filename
    try:
      # read totals from xlsx file
      print(
        f"Extracting post URL and timestamp from PERFORMANCE sheet in {
          pending_path
        }"
      )
      post_details_raw_df = pd.read_excel(
        pending_path, 
        sheet_name="PERFORMANCE", 
        header=None,
        names=["key", "value"],
      ).dropna(how='all')

      # Extract timestamp. post URL and date of analytics
      transposed_post_details_df = \
        post_details_raw_df.dropna().set_index('key').transpose()
      post_timedelta_str = \
        transposed_post_details_df["Post Publish Time"].iloc[0]

      # Extract date range of analytics from specific string pattern
      df_with_date_range = \
        post_details_raw_df[post_details_raw_df['key'].str.contains(
          " Highlights ", na=False
        )]['key']
      if df_with_date_range.empty:
        print("Cannot detect analytics date")
        date_start_str = transposed_post_details_df["Post Date"].iloc[0]
        date_end_str = None
      else:
        date_range_str = \
          df_with_date_range.str.split(" Highlights ").iloc[0][1]
        [date_start_str, date_end_str] = date_range_str.split(" to ")

      post_timestamp = pd.to_datetime(
        date_start_str + ' ' + timedelta_str + ' UTC'
      ).to_pydatetime()

      post_url = transposed_post_details_df["Post URL"].iloc[0]
      if date_end_str:
        analytics_date = \
          pd.to_datetime(date_end_str).to_pydatetime().date()
      else:
        analytics_date = None

      post_top_demographics_df = pd.read_excel(
        pending_path, 
        sheet_name="TOP DEMOGRAPHICS", 
      ).dropna()
      post_top_demographics_df.columns = \
        post_top_demographics_df.columns.str.lower().str.replace(
          '%', 'percent'
        )
      
      print(f"Preparing post details record for {link}")
      post_details_record = Row(
        post_url=post_url, 
        link=link,
        post_timestamp=post_timestamp,
        analytics_date=analytics_date,
        performace_metrics=str(transposed_post_details_df.to_dict()),
        demographics_metrics=str(post_top_demographics_df.to_dict()),
        ingestion_timestamp=ingestion_timestamp,
        source_file=file_path,
        source_file_timestamp=file_timestamp,
      )

      # Write post_details_record to Delta table with upsert logic 
      # based on latest analytics date
      print(
        f"Writing to {bronze_post_details_table}: {post_details_record}"
      )
      post_details_spark_df = \
        spark.createDataFrame([post_details_record])
      if spark.catalog.tableExists(bronze_post_details_table):
        delta_table = \
          DeltaTable.forName(spark, bronze_post_details_table)
        delta_table.alias("t").merge(
          post_details_spark_df.alias("s"),
          "t.post_url = s.post_url and "
          "t.analytics_date = s.analytics_date"
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
      else:
        post_details_spark_df.write.format("delta").saveAsTable(
          bronze_post_details_table
        )

      print(f"Processed: Moving {pending_path} to {processed_path}")
      dbutils.fs.mv(pending_path, processed_path)


    except Exception as e:
      print(e)
      print(f"Errors encountered: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
  else:
    # move invalid filename to errors folder
    try:
      print(f"Invalid filename: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
    except Exception as e:
      print(f"Failed to move file {pending_path}: {e}")

By now you should have seen patterns emerge in the code presented here. We will look into cleaning it up later in the blog series once we move nearer to making it production-ready. For now, let’s look at what the code is doing:

  1. Check that the filename is of the expected format. This includes duplicate files that automatically get a bracketed number appended at the end of the filename, because the name of the post analytics Excel file is the same regardless of what date it is downloaded on.
  2. Apply the headers of ‘key’ and ‘value’ to the raw, header-less table in the PERFORMANCE sheet.
  3. Where possible, extract the analytics date from the string containing “X Highlights from X date to Y date”, where “Y date” is the date when the file was downloaded and hence the analytic date. There is no gurantee that there will be highlights (i.e. the post could have no form of engagements); in such a case the code sets the analytic date to None.
  4. Transpose the PERFORMANCE table in order to extract the details about Post Date, Post Time and Post URL; generate the Post Timestamp in UTC format from the Post Date and Post Time.
  5. Pull the raw data for both PERFORMANCE and TOP DEMOGRAPHICS sheets into two seperate JSON strings. This ensures that we can access this data in the future if we decide to use it; for now it is there just in case.
  6. Replace ‘%’ with ‘percent’ as a column name in TOP DEMOGRAPHICS (‘%’ is invalid as a pyspark DataFrame colunm name).
  7. Write the accumulated data as a pyspark Row into the bronze.linkedin.post_details table, using the post URL as well as the analytic date as a combined primary key. When using this method, one must ensure that the date and datetime values are set in Python’s default datetime type, rather than the version from pandas.

IV. What about post metadata?

We have now pulled in all the raw data, but there are some lingering issues:

  1. There is still the mismatch between Post URL in the post_details table and Post URL in other tables.
  2. We do not know the contents of the post, which could be important further down the line.

Here, we have no choice but to scrape data. Fortunately, I’ve found that once you click on either version of the Post URL, you get to the same LinkedIn post that has a static, canonical URL. That will be our starting point.

a. Scraping post metadata

Updated data flow for the bronze layer, now including the web crawling workflow for post metadata.

We will use the default requests library to retrieve the LinkedIn posts that the two seperate Post URLs point to, and BeautifulSoup to parse the resultant HTML code. Make sure to install the BeautifulSoup library in the notebook by running the following code:

%pip install bs4

Web scraping is a whole other rabbit hole by itself, so we won’t cover it in much detail. If you want to know more, this tutorial is a good place to get started.

Suffice to say that after exploring the web scraping results, I identified the following additional fields (in purple) that can be used to enrich the post and post_details tables:

Updated schema for bronze.linked.posts table, now including the metadata returned from crawling (in purple).
Updated schema for bronze.linkedin.post_details table, now including the link retrieved from crawling (in purple). Excluding other crawled metadata since bronze.linkedin.posts table will have the same data for the same posts. This version also includes the additional ‘just-in-case’ raw data (in green) for performance and demographics analytics that we decided to store in this table.

Based also on my findings, I created the following helper functions that will subsequently be used to extract the required metadata:

# Helper functions for fetching and parsing LinkedIn post HTML content
import requests
from bs4 import BeautifulSoup

def get_html_content(url: str):
  """
  Fetches HTML content from a LinkedIn post URL.
  """
  headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
                  "(KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
  }
  response = requests.get(url, headers=headers)
  return response.content

def get_content(html_content):
  """
  Extracts post content from HTML using BeautifulSoup.
  """
  soup = BeautifulSoup(html_content, "html.parser")
  content_wrapper = soup.find(
    'p', class_="attributed-text-segment-list__content"
  )
  if content_wrapper:
    return content_wrapper.get_text()
  return None

Now we are ready to modify the post and post_details ingestion as follows:

# ... Add this code snippet just before writing to 
# the bronze.linkedin.posts table

    # Fetch HTML content for each post URL
    impressions_pd['html_content'] = \
      impressions_pd['post_url'].apply(get_html_content)
    # Extract link, title, and content from the HTML content
    impressions_pd['link'] = \
      impressions_pd['html_content'].apply(
        lambda x: BeautifulSoup(x, "html.parser").find('link').get('href')
      )
    impressions_pd['title'] = \
      impressions_pd['html_content'].apply(
        lambda x: BeautifulSoup(x, "html.parser").find("head").get_text(
          strip=True).split('|')[0].strip()
      )
    impressions_pd['content'] = \
      impressions_pd['html_content'].apply(get_content)

# ... modify the creation of the post_details_record as follows:

      # Fetch HTML content for each post URL
      print(f"Fetching HTML content and link for {post_url}")
      html_content = get_html_content(post_url)
      # Extract link from the HTML content
      link = \
        BeautifulSoup(html_content, "html.parser").find('link').get('href')
      
      print(f"Preparing post details record for {link}")
      post_details_record = Row(
          post_url=post_url, 
          link=link, # newly added
          post_timestamp=post_timestamp,
          analytics_date=analytics_date,
          performace_metrics=str(transposed_post_details_df.to_dict()),
          demographics_metrics=str(post_top_demographics_df.to_dict()),
          ingestion_timestamp=ingestion_timestamp,
          source_file=file_path,
          source_file_timestamp=file_timestamp,
      )

Now all the ingestion should be good to go. Let’s test on some data and do some checks…

b. …wait, what’s this?

The post metadata for these links have gone haywire!

I wondered why I was receiving nonsense links and content for some of the posts; it turns out that the requests were failing for posts older than about 7 years ago, and I happened to had some impressions recorded for those older posts during the period of analysis

The range of dates for the problematic posts.

What happens when the programmatic method fails? This is where we turn to the last resort: manual patching.

c. Patching the data

Updated data flow for the bronze layer, now including the patching workflow.

In the real world, data patching is an unfortunate and largely unavoidable fact of life. Corrupt or missing data can occur for a variety of reasons; maybe data is unavailable to be pulled via automatic means, or there was an error in data ingestion that cannot be recovered by re-running the pipeline. Someone (usually the data practitioner) will at some point need to step in and patch the flaw, or otherwise find a way to work around it. Otherwise the data becomes unreliable, which is a good way to get end-users to stop using the data product.

In this instance, there are 19 posts that could not be crawled. I really only needed the main URL, title and content of each of the post, so I judged that I could expend the effort to enter the values that I needed by visiting each of the links in turn and retrieving the data that way. Unfortuantely Post Time isn’t one of the features that I could retrieve, but then again the posts were old enough that I didn’t see that missing information as being a big deal.

Schema for the bronze.linkedin.post_patch table.

I created the patch file in Excel, then saved it in TSV (tab-delimitated values) format and directly ingested it into Databricks as a table. In a more controlled data environment, I would instead upload the patch file to a seperate volume (e.g. misc, as illustrated in the updated data flow diagram), and then process the patch as part of the ingestion pipeline.

Screencap of the creation of the post_patch table via file upload. Note the advanced options selected.

And we now finally have all the raw data we need to move to the next stage. 

If you’re wondering why we haven’t cleaned the posts or post_details table, that’s not the purpose of the bronze layer. The silver layer is where the heavy work of cleaning and transformation really begins.

V. What’s Next?

In the next article, we can move on to the the cleaning and transformation stage: making our data ready for a deeper analysis. See you there!

Yingzhao Ouyang is an AI and data engineering specialist with a distinctive blend of humanities, business, and technical expertise, bringing a uniquely holistic perspective to enterprise data challenges that others with purely technical backgrounds miss. To find out more, follow his LinkedIn profile at https://www.linkedin.com/in/yzouyang/

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.