Workflows ยท From Tasks

From tasks handle all data imports to your data lake. You can configure the items below. Click on the name for more info.

Extract

The `extract` part of the configuration considers the extract part of a task. The vocabulary of the datasource is leading in this part. Will Workflows talk about fields and records, it will use columns and rows here if the datasource uses those.

All from tasks come with pre-formatted schemas where possible. Examples of these are from_facebook or from_xandr. Tasks where a pre-formatted schema is not possible, we give you the opportunity to create one. Examples of these tasks are from_imap (which imports mail attachments) or from_aws_s3 (which imports files from Amazon S3)

New task types are added to Workflows on a monthly basis. Currently Workflows supports the following from tasks:

  1. from_appfigures
  2. from_apple_app_store_connect
  3. from_aws_s3
  4. from_bigquery
  5. from_bing
  6. from_bluesky
  7. from_dcm
  8. from_dcbm
  9. from_facebook
  10. from_ftp
  11. from_google_ads
  12. from_google_analytics
  13. from_google_analytics_management
  14. from_google_drive
  15. from_google_search_console
  16. from_google_sheets
  17. from_imap
  18. from_linkedin
  19. from_looker
  20. from_url
  21. from_salesforce
  22. from_snowflake
  23. from_x
  24. from_xandr

File

The file part of the configuration contains the definition of the data file being downloaded. Use for from task where the format is not pre-defined. E.g. with from_aws_s3 to download a file.

Usage

file:
    type: txt
    transforms:
        - type: uncompress
    delimiter: ;
    encoding: UTF-8
    text_enclosure: '"'
    new_line: '\n'
    escape_char: '\'
    has_header_row: yes
    skip_rows: 9
    skip_bottom_rows: 8

Properties

propertytyperequireddescription
typeenumerator (txt, json, xml, parquet, avro)yesFile type. Use txt for delimited files, json for newline delimited json, xml for XML, parquet for Parquet files and avro for AVRO files.
nodestringyesOnly when type=xml. Node to take as the root node.
delimitercharnoOnly when type=txt. Default is comma(,). Contains the char that delimits the fields.
encodingenumerator (valid encoding type)noOnly when type=txt. Default is UTF-8. Use when you need a different encoding type.
has_header_rowyesno (boolean)noUse when type=txt. Default is no. If file contains a header or not.
text_enclosurecharnoUse when type=txt. Default is ". Quoting character to enclose fields.
new_linecharnoUse when type=txt. Default is \n. Newline character. Use \n for unix based systems, or \n\r for Windows based systems.
escape_charcharnoUse when type=txt. Default is \. Escape character being used in text files.
quotingenumerator (minimal, all, nonnumeric, none)noUse when type=txt. Default is minimal. How verbose is the quoting in the file? all = All fields, nonumeric = Only non-numeric fields, minimal = Only when needed (default), none = Never use quotes.
skip_rowsint (>= 0)noDefault is 0. Amount of rows to skip at the top of the file. Happens before has_header_row is executed.
skip_bottom_rowsint (>= 0)noDefault is 0. Amount of rows to skip at the bottom of the file.

transforms

The `transforms` part of the configuration enables you to manipulate data before you load it in your data lake. You have to add all transform as an array.

Example usage

transforms:
    # Field transforms
    - type: fields_to_snake_case
    - type: fields_to_lower_case
    - type: fields_regexp_and_replace
        search: '_'
        replace: ''

Transform types

Below are the types of transforms. You have to add them as an array. Below is an explanation per transform on how to use it exactly.

transform: add_current_date_field

Adds the current date (trigger_date) as a field.

propertytyperequireddescription
typestringyesSet to add_current_date_field
fieldstringyesField name of the new column, e.g. trigger_date

transform: convert_type

Converts the type of a field. Only useful with BigQuery and no schema is set.

propertytyperequireddescription
typestringyesSet to convert_type
fieldsarray of stringsyesArray of field names
toenumerator (supported field type of BigQuery)yesType the field should be transformed to

transform: rename_field

Rename a field

propertytyperequireddescription
typestringyesSet to rename_field
fromstringyesCurrent name of the field
tostringyesName the field should be renamed to

transform: copy_field

Duplicate a field.

propertytyperequireddescription
typestringyesSet to copy_field
fromstringyesCurrent name of the field
tostringyesName the field should be duplicated to

transform: drop_fields

Drops fields.

propertytyperequireddescription
typestringyesSet to drop_fields
fieldsarray of stringsyesArray of field names the should be dropped.

transform: encrypt

Encrypt field content. Support for several hash algorithms.

propertytyperequireddescription
typestringyesSet to encrypt
encryptionenumerator (sha1, md5, sha1, sha224, sha256, sha384, sha512)yesHash algorithm that should be used to hash the content of a field.
fieldsarray of stringsyesArray of field names the should be dropped.

transform: fields_search_and_replace

Search and replace a field name.

propertytyperequireddescription
typestringyesSet to fields_search_and_replace
searchstringyesString to search.
replacestringyesString to replace search string with.

transform: fields_to_snake_case

Transform field names to snake_case. E.g. OneSecondBefore to one_second_before. Also replaces the field names in a schema.

propertytyperequireddescription
typestringyesSet to fields_to_snake_case

transform: fields_to_lower_case

Transform field names to lower case. E.g. ONESECONDBEFORE to onesecondbefore. Also replaces the field names in a schema.

propertytyperequireddescription
typestringyesSet to fields_to_lower_case

transform: fields_regexp_and_replace

Regular expression replacement of field names.

propertytyperequireddescription
typestringyesSet to fields_regexp_and_replace
searchstringyesRegular expression to search.
replacestringyesRegular expression to replace found string with.

transform: filter_records

Filter records from incoming dataset.

propertytyperequireddescription
typestringyesSet to filter_records
fieldsarray of stringsyesArray of field names on which the filter must be applied.
valuestringyesIf a field contains this value, the whole record will be filtered.
filterenumerator (include, exclude)yesUse include if you want all records to be included when the field(s) contain the value. Use exclude if you want all records to be excluded when the field(s) contain the value.

transform: find_all

Find all occurrences of a regular expression and return an array of matches. Based on Python's re.findall

propertytyperequireddescription
typestringyesSet to find_all
fieldsarray of stringsyesArray of field names on which the filter must be applied.
searchstringyesRegular expression to search for in the field(s).

transform: find_first

Find the first occurrence of a regular expression and the match. Based on Python's re.findall

propertytyperequireddescription
typestringyesSet to find_first
fieldsarray of stringsyesArray of field names on which the filter must be applied.
searchstringyesRegular expression to search for in the field(s).

transform: regexp_and_replace

Regular expression replacement of field data.

propertytyperequireddescription
typestringyesSet to regexp_and_replace
fieldsarray of stringsyesArray of field names on which the filter must be applied.
searchstringyesRegular expression to search.
replacestringyesRegular expression to replace found string with.

transform: replace_with_null

Conditionally replace field data with null value.

propertytyperequireddescription
typestringyesSet to replace_with_null
fieldsarray of stringsyesArray of field names on which the filter must be applied.
valuesarray of stringsyesIf the field value matches any of the strings in this array, the field will be replaced with a null value.

transform: search_and_replace

Search and replace a substring in field data.

propertytyperequireddescription
typestringyesSet to search_and_replace
searchstringyesArray of field names on which the filter must be applied.
fieldsarray of stringsyesIf the field value matches any of the strings in this array, the field will be replaced with a null value.

transform: strptime

Parse a field value as a time string. Uses Python's datetime.strptime.

propertytyperequireddescription
typestringyesSet to strptime
formatstringyesFormat of the string that should be parsed as a date & time. Use these format codes
fieldsarray of stringsyesIf the field value matches any of the strings in this array, the field will be replaced with a null value.

Schema

Don't use a schema for the API's that use pre-formatted schemas. The extract section of the task type you chose contains this information. The `schema` part of the configuration is used to create the destination table. Obliged for Snowflake, but optional for BigQuery.

Example usage

schema:
    description: You can put a table description here
    time_partitioning:
        field: date_field
        type: day
        require_partition_filter: yes
    fields:
        - name: date_field
          type: DATE
          mode: REQUIRED
          description: Description for date_field will be visible as field comment. Field will be used for partitioning in BigQuery and clustering in Snowflake.
        - name: some_string_field
          type: STRING
          mode: NULLABLE
          description: Description for some_string_field will be visible as field comment.
        - name: some_int_field
          type: INTEGER
          mode: NULLABLE
          description: Description for some_int_field will be visible as field comment.
        - name: some_repeated_field
          type: OBJECT
          mode: REPEATED
          description: Description for some_int_field will be visible as field comment.
          fields:
            - name: date_field
                type: DATE
                mode: REQUIRED
                description: Description for date_field will be visible as field comment. Field will be used for partitioning in BigQuery and clustering in Snowflake.
            - name: some_string_field
                type: STRING
                mode: NULLABLE
                description: Description for some_string_field will be visible as field comment.

Properties of fields

Obliged. Array of table fields.

propertytyperequiredSnowflake supportBigQuery supportdescription
namestringyesyesyesField name. Use either valid names for BigQuery or Snowflake.
typeenumerator (valid data type)yesyesyesSQL data type of field. Use only valid types of BigQuery or Snowflake
modeenumerator (nullable, required, repeated)nonoyesBigQuery only. Define the mode of this field. Nullable = may be NULL, required = may not be NULL, repeated = array.
descriptionstringnoyesyesDescription of the table field. Is added in Snowflake as a column comment during table creation. Is updated with every load in BigQuery.
fieldsarray of fieldsnonoyesContains nested fields. Use mode=repeated if this should be an array of fields. Use same format for fields as described here.

Properties of time_partitioning

Optional. Time partitioning considers either clustering (for Snowflake) or time partitioning (for BigQuery).

propertytyperequiredSnowflake supportBigQuery supportdescription
fieldstringyesyesyesField name. Must be present in the field definitions
typeenumerator (hour, day, month, year)yesyesyesTimeframe of the partition. Default is day. Per hour, day, month or year.
require_partition_filteryesno (boolean)yesnoyesDefault is no. BigQuery only. If the partition filter should be required in your SQL.

Other properties

Extra properties that can be added during table creation.

propertytypeSnowflake supportBigQuery supportdescription
descriptionstringyesyesTable description. Is added during table creation and update. In Snowflake the description is added to the COMMENT and suffixed with the task TAGS that last touched the table.

Deduplicate

Optional. Deduplicate the destination table to make sure you don't import the same data twice. Configure once, deduplicates automatically. If no deduplication is added, all imported data will be added to the destination table.

You can deduplicate a storage or table destination. Click on a deduplicate type for an explanation.

deduplicate: storage

Below are the types of deduplicate. Below is an explanation per deduplicate type.

Example usage

deduplicate:
    type: delete_all

deduplicate storage: none

Default setting. No deduplication. Adds all files to the destination storage location. Overwrites if the source files have the same name as the destination files.

propertytyperequireddescription
typestringyesSet to none

deduplicate storage: delete_all

Deletes all files in the destination storage location.

propertytyperequireddescription
typestringyesSet to delete_all

deduplicate: table

Below are the types of deduplicate. Below is an explanation per deduplicate type.

Example usage

deduplicate:
    type: date_range
    field: booking_date

deduplicate: append

Default setting. No deduplication. Appends all data to the destination table.

propertytyperequireddescription
typestringyesSet to append

deduplicate: drop

Preferred over type=truncate. Drops destination table and recreates the table (with latest fields and descriptions) before importing new data.

propertytyperequireddescription
typestringyesSet to drop
with_loopenumerator (all, once)yesOnly needed when task.loop_by is set. Use all to drop and recreate the table with every loop. Use once to only drop and recreate the table with the first loop.

deduplicate: truncate

Prefer type=drop over truncate. Truncates (or creates if not exists) destination table before importing new data.

propertytyperequireddescription
typestringyesSet to truncate
with_loopenumerator (all, once)yesOnly needed when task.loop_by is set. Use all to drop and recreate the table with every loop. Use once to only drop and recreate the table with the first loop.

deduplicate: date_range

Deletes the date range from deduplicate.start_date (or task.start_date) till deduplicate.end_date (or deduplicate.end_date) (inclusive) before importing new data. Currently only supports deletion by days. Contact support if you want a different timeframe, e.g. hour.

propertytyperequireddescription
typestringyesSet to date_range
fieldstringyesName of the date field.

deduplicate: keys

Basically a merge of the imported dataset into the destination table. First deletes all records in the destination table if the key(s) value(s) are present in the imported dataset. Then appends the imported dataset.

propertytyperequireddescription
typestringyesSet to date_range
fieldstringyesName of the date field.

Load

Optional. Configures where in the data lake the imported data should be stored.

Load types

Below are the types of load. Click on a load type for an explanation.

load: database (BigQuery)

Load imported data into a BigQuery table.

Example usage

Example of loading the data files to table `my-data-lake.osb_raw.weather-data`

load:
    destination: database
    conn_id: google_cloud_default
    project_id: my-data-lake
    dataset_id: osb_raw
    table_id: weather-data
propertytyperequireddescription
destinationenumerator (database, storage)noDefault is database
conn_idstringnoName of the connection. If not declared, client_cloud.db_conn_id is used.
project_idstringnoProject ID of the destination table. If not declared, client_cloud.project_id is used.
dataset_idstringnoDataset ID of the destination table. If not declared, client_cloud.dataset_id is used.
table_idstringnoTable ID of the destination table. If not declared, task.id is used.
table_expiration(relative) date or datetimenoSets expiration date of the table. If not declared, client_cloud.table_expiration is used. Set it to None if you don't want a default table_expiration, e.g. for your production tables. Recommended to add this to the sandbox files to make sure your sandbox tables will be deleted automatically. Besides absolute dates (like 2021-09-27), you can use relative dates as well.
max_bad_recordsinteger (>= 0)noMaximum amount of bad records before the load operation fails. Must be an integer of 0 or greater.
autodetect_schemayesno (boolean)noBigQuery can detect the schema of a file being loaded. Is automatically set to yes if no schema is available.

load: database (Snowflake)

Load imported data into a Snowflake table.

Example usage

Example of loading the data files to table PRODUCTION.OSB_RAW.WEATHER_DATA

load:
    destination: database
    conn_id: snowflake
    database: PRODUCTION
    schema: OSB_RAW
    table: WEATHER_DATA
propertytyperequireddescription
destinationenumerator (database, storage)noDefault is database
conn_idstringnoName of the connection. If not declared, client_cloud.db_conn_id is used.
databasestringnoDatabase of the destination table. If not declared, client_cloud.database is used.
schemastringnoSchema of the destination table. If not declared, client_cloud.schema is used.
tablestringnoTable of the destination table. If not declared, task.id.upper() is used.
max_bad_recordsinteger (>= 0)noMaximum amount of bad records before the load operation fails. Must be an integer of 0 or greater.

load: storage

Example usage

Example of loading the data files to s3://my-test-bucket/some/folder

load:
    destination: storage
    conn_id: s3
    bucket: my-test-bucket
    folder: some/folder

Load imported data to Amazon S3, Google Cloud Storage or Azure Blob Storage (in beta).

propertytyperequireddescription
destinationenumerator (database, storage)yesSet to storage
conn_idstringnoName of the connection. If not declared, client_cloud.storage_conn_id is used.
bucketstringnoName of the bucket. If not declared, client_cloud.bucket is used.
locationstringnoName of the location. Please add extension as well. If not declared, client_cloud.folder is used.
compressionenumerator(gzip, deflate)noCompression type
export_formatenumerator(json, csv, avro)noFile format
project_idstringnoBigQuery only. Project ID of the bucket. If not declared, client_cloud.project_id is used.

validate

The `validate` part of the configuration enables you to check the data you just imported. If the imported data does not pass the validation an error will be generated.

Validation types

Below are the validation types. You have to add each validation as an array. Below is an explanation per validation on how to use it exactly.

Special property: query

Set this property to the query you want to execute. Make sure the resultset is flattened (only rows and columns, not nested or repeated columns). The query property is obliged for the first validation, but optional for all subsequent validations. If you choose not to add query to the subsequent validations, Workflows will use the same resultset for the validation until it hits another query property. If you add a query property to any of the subsequent validations, Workflows will execute the query, and use the resultset in that validation and the subsequent validations until it hits a another query property again. You are allowed to use SQL Templating.

Validate: number

Validates a number field.

Example usage

The examples validates if the value of all rows in the resultset of column lines is greater than (>) the value (1000). The example uses SQL Templating, hence the {{ and }}.

validate:
    - query: |
              SELECT my_segment, COUNT(1) AS lines
              FROM `{{ load.project_id }}.{{ load.dataset_id }}.{{ load.table_id }}`
              GROUP BY 1
      type: number
      operator: '>'
      value: 1000
      field: lines
propertytyperequireddescription
querystringrequired for first validationQuery to execute. See special property: query for more info.
typestringyesSet to number
fieldstringyesField name that will be validated
operatorenumerator (<=, <, ==, >, >=)yesOperator to use in the validation
valueinteger or floatyesValue to match the field against.

Validate: total_rows

Validates the number of rows of a resultset.

Example usage

The examples validates if the amount of rows equals (==) eight. lines is greater than (>) the value (1000). The example uses SQL Templating, hence the {{ and }}.

validate:
    - query: |
              SELECT my_segment, COUNT(1) AS transactions
              FROM `{{ load.project_id }}.{{ load.dataset_id }}.{{ load.table_id }}`
              GROUP BY 1
      type: total_rows
      operator: '=='
      value: 8
propertytyperequireddescription
querystringrequired for first validationQuery to execute. See special property: query for more info.
typestringyesSet to total_rows
operatorenumerator (<=, <, ==, >, >=)yesOperator to use in the validation
valueinteger or floatyesValue to match the field against.

Validate: yesno

Validates if the value of a field is yes (True) in all rows.

Example usage

The examples validates if the value of field has_enough_transactions is yes (True). The example uses SQL Templating, hence the {{ and }}.

validate:
    - query: |
              SELECT COUNT(1) > 1000 AS has_enough_transactions
              FROM `{{ load.project_id }}.{{ load.dataset_id }}.{{ load.table_id }}`
      type: yesno
      field: has_enough_transactions
propertytyperequireddescription
querystringrequired for first validationQuery to execute. See special property: query for more info.
typestringyesSet to yesno
fieldstringyesField name of which the value must be yes (True)