The task with type Do, is the task that handles data migration and manipulation in your data lake. Well known use cases are data aggregation (for reporting purposes), data backups or storage loads. Depending on how you configure the particular root properties, the task behaves differently. The exact type of task is broadcast in the log. You can use the Do task for the following tasks in your data lake:
Materializes a table in a storage location in your data lake. Well known use-case is to backup a table.
root property | description |
---|---|
extract | Configure a table identifier for BigQuery or
for Snowflake.
Don't use query or template . See Query to storage for this. |
deduplicate | Configure a storage deduplications |
load | Configure as a storage load. |
task:
type: do
extract:
source: database
dataset_id: osb_analytics
# Use the $
sign to export a partition (only in BigQuery)
# This generates a partition of the previous hour in UTC timezone
table_id: hits${{ task.trigger_date.subtract(hours=1).in_timezone('UTC').strftime('%Y%m%d%H') }}
deduplicate:
# Purge all files in storage destination
type: delete_all
load:
destination: storage
object_name_or_glob: |
{{ task.trigger_date.subtract(hours=1).in_timezone('UTC').strftime('%Y') }}/{{ task.trigger_date.subtract(hours=1).in_timezone('UTC').strftime('%m') }}/{{ task.trigger_date.subtract(hours=1).in_timezone('UTC').strftime('%d') }}/{{ task.trigger_date.subtract(hours=1).in_timezone('UTC').strftime('%H') }}/hits_*.json.gz
bucket: osb-analytics-backup
compression: gzip
export_format: json
Materializes a query in a storage location in your data lake.
root property | description |
---|---|
extract | Configure as a query extract for BigQuery or query extract for Snowflake. |
deduplicate | Configure a storage deduplications |
load | Configure as a storage load. |
task:
type: do
#Extract
extract:
query: |
SELECT date, id, index
FROM `factbase-augment.transfer_testdata.some_table`
#Delete duplicate rows
deduplicate:
type: delete_all
load:
destination: storage
bucket: some_bucket
object_name_or_glob: test_folder/data_*.csv.gz
compression: gzip
export_format: csv
Materializes a query in a table. Well known use-case is data aggregation for reporting.
root property | description |
---|---|
extract | Configure as a query extract for BigQuery or query extract for Snowflake. |
deduplicate | Configure a table deduplications |
load | Configure as a database load for BigQuery or Snowflake. |
task:
type: do
#Extract
extract:
query: |
SELECT date, id, index
FROM `factbase-augment.transfer_testdata.some_table`
#Delete duplicate rows
deduplicate:
type: keys
keys:
- id
- date
- index
load:
destination: database
dataset_id: transfer_testdata
table_id: storage_to_table
Executes a query. Well known use-case is DML statements on tables
root property | description |
---|---|
extract | Configure as a query extract for BigQuery or query extract for Snowflake. |
task:
type: do
#Extract
extract:
query: |
UPDATE `factbase-augment.transfer_testdata.storage_to_table`
SET id=TO_BASE64(MD5(id))
WHERE 1=1
Loads blobs from storage in a table.
root property | description |
---|---|
extract | Configure as a storage location. |
file | Configure a file format |
schema | Configure a schema if you use Snowflake or want to force some field names and types in BigQuery. |
deduplicate | Configure a table deduplications |
load | Configure as a database load for BigQuery or Snowflake. |
task:
type: do
start_date: today -2 days
#Extract
extract:
source: storage
bucket: my-bucket
prefix: test_folder/storage_to_table.csv.gz
file:
type: txt
delimiter: ','
encoding: UTF-8
text_enclosure: '"'
new_line: '\n'
escape_char: '\'
has_header_row: yes
#Delete duplicate rows
deduplicate:
type: date_range
field: delivery_date
load:
destination: database
dataset_id: transfer_testdata
table_id: storage_to_table
Validates a query result.
root property | description |
---|---|
validate | Configure a validation like in a from task. |
# A test with nothing but a validate query in it
task:
type: do
validate:
- query: |
SELECT COUNT(1) AS lines,
COUNT(DISTINCT index) AS indexes,
COUNT(DISTINCT id) AS ids,
COUNT(DISTINCT date) AS dates
FROM `factbase-augment.transfer_testdata.storage_to_table`
field: lines
type: number
operator: '=='
value: 6000
- field: ids
type: number
operator: '=='
value: 3000
- field: indexes
type: number
operator: '=='
value: 3000
- field: dates
type: number
operator: '=='
value: 2