flowfunctions are used to develop, deploy and manage data workflows in BigQuery. This is the set of guidelines we follow and recommend when building these data workflows.
The ideal approach to managing inbound data is to use Google-native connections and methods for ingesting data. When this is not possible and 3rd party tools such as Airbyte, Dataddo, Rivery or Stitch, or custom code is used to ingest data, some additional checks are recommended to validate data integrity.
Date partitioning by day is recommended for inbound data tables in all cases where data has a date dimension. Using date partitioning enables efficient monitoring of partitions using the INFORMATION_SCHEMA.PARTITIONS view, which allows quick and cost-effective analysis of partition existence and row counts to quickly highlight any missing data or potential duplicate data.
Timestamp partitioning may also be appropriate depending on the cadence of the inbound data, however currently the maximum partitions is 4000, which is over ten years of day-partitioned data but less than six months of hourly partitions. This means that hourly partitioned data requires a more complex approach to ensure that limits are not hit.
In BigQuery the only reference to a table, view or routine is the
project_id.dataset_name.table_name), so it is possible to copy, drop and recreate a table with the exact same schema but a different partitioning defintion without negatively impacting inbound data. The DDL to achieve this would be, for each inbound table:
CREATE OR REPLACE TABLE `project_id.dataset_name.original_table_name_temp` PARTITION BY date_field AS SELECT * FROM `project_id.dataset_name.original_table_name`; DROP TABLE `project_id.dataset_name.original_table_name`; CREATE OR REPLACE TABLE `project_id.dataset_name.original_table_name` PARTITION BY date_field AS SELECT * FROM `project_id.dataset_name.original_table_name_temp`; DROP TABLE `project_id.dataset_name.original_table_name_temp`;
For reference, the equivalent functional approach would use the following
DDL.PARTITION_TABLE_BY_DATE(table_id STRING, partition_column STRING, cluster_columns ARRAY<STRING>)
For a table which needs to be partitioned on a date field called
date and without table clustering, the execution syntax - assuming the
@@dataset_project_id has been set to the appropriate region would be:
In reality, you might want to apply this same table upgrade to a number of different tables. This is made very simple by using a basic looping construct:
DECLARE tables_to_upgrade ARRAY<STRING>; SET tables_to_upgrade = ['table_a_id','table_b_id','table_c_id','table_d_id']; FOR each_table IN (SELECT table_id FROM UNNEST(tables_to_upgrade) AS table_id) DO CALL DDL.PARTITION_TABLE_BY_DATE(each_table.table.id, 'date', ); END FOR;
In cases where data is received in files such as CSV, JSONL, Parquet or Avro into a GCS bucket, the data can be directly accessed in BigQuery using either external tables or BigLake tables. BigLake tables will typically provide higher performance but have additional setup complexity and management overhead as you need to confgure and authorize a connection the to data source. External tables can also be created from Google Sheets, but this is not recommended for ingesting data as there are potential performance implications and they represent a potential failure point as they can be accessed and edited directly by humans.
Profiling of inbound data is an important intial stage for any BigQuery data workflow. If using partitioned tables, then inbound partition profiling using the INFORMATION_SCHEMA.PARTITIONS view enables quick analysis of data existence and row counts, which is a reliable and inexpensive way of assuring inbound data existence and size. For non-partitioned tables, depending on the structure it is also possible and recommended to profile the inbound data, for example by checking null counts and distributions. However this will involve query costs and be slower than partition profiling.
Building on the profiling functions,it is also recommended that the inbound data profile is monitored either passively via a monitoring dashboard, or actively via a Scheduled Query with ASSERTIONS and email notifications, or Scheduled Queries and a Remote Function for Slack notifications.
Over time BigQuery projects can get quite messy and it is difficult to understand which resources are unused or invalid. As such it is worthwhile to periodically go through a cleanup process.
If views or functions reference resources which no longer exist, then these resources (and all resources referencing these resources) will also become invalid. If this has not been noticed then the chances are that they will not be missed and can be deleted, backing up the SQL for peace-of-mind.
This is an onerous and potentially error-prone job to undertake manually, especially on larger projects. The recommended approach is to find all project views via a combination of the INFORMATION_SCHEMA.SCHEMATA to get all datasets, the INFORMATION_SCHEMA.TABLES filtered on
table_type = VIEW and then to use the API and the dry_run flag to identify invalid resources programattically.
It is also recommended to identify any tables for which there have been no load or query jobs in a preceding time period (e.g. a year), as these are likely causing clutter and not required. This is possible by combining the INFORMATION_SCHEMA.SCHEMATA to get all datasets, the INFORMATION_SCHEMA.TABLES to get all tables and the INFORMATION_SCHEMA.JOBS to get historic activity. Integrating these is very powerful but complex exercise.
Although there is currently a new feature in BigQuery to view data lineage via an interactive visual graph, this simply enables you to visually inspect the upstream process for any table or view. In many cases it is more useful to identify downstream lineage, in order to assess the impact of any potential change like table schema changes, column name changes or table/view deletion before it is executed.
This is a non-trivial challenge, but one we solve by building both upstream and downstream dependency graphs programatically annd visually. This programmatic approach enables additional analysis to be undertaken using e.g. the INFORMATION_SCHEMA.COLUMNS to identify column existence in downstream resources.
Data backup is not natively implemented in BigQuery (although time travel does work for data recovery in some instances), however it is recommended for all inbound and outbound tables. This means that if there are any adverse events and tables are deleted or corrupted, they can be recovered with minimal disruption. The recommended approach is to regularly backup individual partitions to a Google Cloud Storage Bucket using a scheduled EXPORT DATA statement, exporting data in Parquet format and using native compression such as GZIP for cost management.
Backups might not be required if - for example - the inbound data is stored in files in a GCS bucket and accessed in BigQuery via external or BigLake tables.
It is also easy to accidentally delete SQL-based views as well as routines and functions, which can cause significant disruption and rework. This code can be backed up to backup tables in BigQuery using a combination of the INFORMATION_SCHEMA.SCHEMATA, the INFORMATION_SCHEMA.ROUTINES, the INFORMATION_SCHEMA.PARAMETERS, the INFORMATION_SCHEMA.ROUTINE_OPTIONS and INFORMATION_SCHEMA.VIEWS. This can also be backed up as Parquet files into a GCS bucket for additional protection.
Backing up data is only half of the challenge, it is also recommended that a clear plan and set of steps is defined and tested to recover any data in case of loss. This can be actioned using scripted Data Definition Language and potentially the LOAD DATA statement.
Idempotency is a key concept to understand, which is explored in detail in this article, however the concept is quite simple: if you re-run the same action, the output should remain the same. If your data transformations are not idempotent then running the same activity twice will result in data duplication, with associated downstream impacts.
SQL transformations are often defined in Views, which are a simple and accessible method. However the main issue with this approach is that every time the view (or any views dependent on that view) is queried, all upstream data is queried and transformations are executed. This will result in degrading performance over time as data size grows.
The recommended approach is therefore to use table functions in place of views, using
end_date arguments for each function, and date partitioning for inbound and outbound tables. This gives flexibility, idempotency and simpler QA and date-based profiling. We refer to these tables as Date-Partitioned Table Functions.
Using BigQuery PROCEDURES we can then package up any required functions into a script and make the set of commands as simple to execute as:
This could, for example, DELETE the outbound date partitions (if existing) for the past three days and INSERT new transformed data.
It is also recommended to use BigQuery TRANSACTIONS so that if any part of the procedure fails (e.g. if a new column has been introduced so the
INSERT fails) then any changes (e.g. the
DELETE partitions action) would be rolled back, an error would be raised and the data would not be changed.
Naming is important but also very difficult. It is recommended that the naming convention ensures that resources can only have dependencies which are before them in an alphabetical-sorted list. This simplifies programmatic operations as actions can always be executed successfully by simple sorting.
We achieve this by using a prefix to indicate the resource type, followed by a numerical index and then a descriptive name, beginning with an action verb, e.g.
In this instance, an incremented initial digit indicates a change of aggregation level (therefore expected change in row count for any date partition). This gives some flexibility, readability and easy reference (e.g. you can just refer to
TF202 for brevity in code, docs or diagrams).
We recommend optimising for readability as opposed to SQL brevity, which means that Common Table Expressions (CTEs) are normally used as opposed to sub-queries. By naming the CTEs with an action verb, it should be possible to understand what each CTE is doing without comments. An example structure is:
WITH get_view_dependencies AS ( ... ), get_scheduled_query_dependencies AS ( ... ), union_view_and_scheduled_query_dependencies AS ( ... ) SELECT * FROM union_view_and_scheduled_query_dependencies
We recommend using the in-build description fields in BigQuery as this enables users to see metadata in place, where the data or code actually is. It also enables access of the descriptions programmatically using e.g. the INFORMATION_SCHEMA.TABLE_OPTIONS, which enables documentation to be automatically built in e.g. markdown for syncing to a code repository. It also makes the matedata viewable in Dataplex.
Comments are difficult to parse programmatically as there is no standardised structure in SQL, so are not recommended.
We believe that documentation should be part of the development workflow, which means that they should not require additional overhead and should never get out of sync with production code.
Scheduling and Orchestration
For simplicity, we recommend using BigQuery Scheduled Queries to schedule workflows. However since Scheduled Queries are technically operated by the BigQuery Data Transfer Service, any code written here is not accessible through the normal BigQuery Console view or any
INFORMATION_SCHEMAview, which can make them difficult to manage.
We recommend packaging all actions into
PROCEDURES in the relevant datasets, grouping the
PROCEDURES into orchestration functions (also
PROCEDURES) and then executing these functions using Scheduled Queries. This enables you to sequence dependant jobs without having to estimate how long each will take to execute and timing them accordingly.
Scheduled Query SQL definitions can be accessed through the Data Transfer API.
Notifications and Logging
Email notifications should be enabled, however these will only go to the single account setting up the scheduled query. However these emails can be filtered and forwarded to e.g. Slack for notifications.
It is also possible to send all Scheduled Query execution results to a PubSub topic, which can now be streamed directly into BigQuery and/or used to trigger a Cloud Function to build a custom notification or trigger a custom workflow based on status.
Looker Studio Connection
Although BigQuery can be connected to a huge number of different Business Intelligence and dashboarding tools, we recommend Looker Studio as the native connection is highly performant, easy to manage and the solution is powerful, continually improving and free.
Partitioning for Performance
If your outbound table is date partitioned, then when you connect it to Looker Studio you will be asked if you want to use this field as the default date range field. By accepting this, it optimises the data queried and flowing to Looker Studio which makes this a highly performant and low-maintenance approach.
If the upstream tables are partitioned, you can also leverage this functionality by building a custom query and using built-in date parameters, for example if you want to query the
INFORMATION_SCHEMA.JOBS in an efficient manner:
SELECT * FROM `[region-id]`.INFORMATION_SCHEMA.JOBS WHERE DATE(creation_time) BETWEEN PARSE_DATE("%Y%m%d", @DS_START_DATE) AND PARSE_DATE("%Y%m%d", @DS_END_DATE)
Clustering output tables is also recommended on high-cardinality columns, which should also improve dashboard performance.
Looker Studio also enables us to define addional parameters which are available to us in custom queries, which can be a powerful and efficient way of pushing computation back to BigQuery, minimising data transfer between BigQuery and Looker Studio and optimising performance.
Looker Studio also has the capability to programmatically clone reports based on templates, while replacing any element (e.g. data source, title etc.) by using the Linking API to build cloning URLs.
This can be very useful for providing an efficient mechanism to copy the same report for multiple clients or customers.