PostgreSQL table function

BETA

The Tinybird postgresql table function is currently in public beta.

The Tinybird postgresql() table function allows you to read data from your existing PostgreSQL database into Tinybird, then schedule a regular Copy Pipe to orchestrate synchronization. You can load full tables, and every run performs a full replace on the Data Source.

To use it, define a Node using standard SQL and the postgresql function keyword, then publish the Node as a Copy Pipe that does a sync on every run.

Set up

Prerequisites

Your postgres database needs to be open and public (exposed to the internet, with publicly-signed certs), so you can connect it to Tinybird via the hostname and port using your username and password.

You'll also need familiarity with making cURL requests to manage your secrets.

Type support and inference

Here's a detailed conversion table:

PostgreSQL data typeTinybird data type
BOOLEANUInt8 or Bool
SMALLINTInt16
INTEGERInt32
BIGINTInt64
REALFloat32
DOUBLE PRECISIONFloat64
NUMERIC or DECIMALDecimal(p, s)
CHAR(n)FixedString(n)
VARCHAR (n)String
TEXTString
BYTEAString
TIMESTAMPDateTime
TIMESTAMP WITH TIME ZONEDateTime (with appropriate timezone handling)
DATEDate
TIMEString (since there is no direct TIME type)
TIME WITH TIME ZONEString
INTERVALString
UUIDUUID
ARRAYArray(T) where T is the array element type
JSONString or JSON
JSONBString
INETString
CIDRString
MACADDRString
ENUMEnum8 or Enum16
GEOMETRYString

Notes:

  • Tinybird does not support all PostgreSQL types directly, so some types are mapped to String in Tinybird, which is the most flexible type for arbitrary data.
  • For the NUMERIC and DECIMAL types, Decimal(p, s) in Tinybird requires specifying precision (p) and scale (s).
  • Time zone support in Tinybird's DateTime can be managed via additional functions or by ensuring consistent storage and retrieval time zones.
  • Some types like INTERVAL do not have a direct equivalent in Tinybird and are usually stored as String or decomposed into separate fields.

About secrets

The Environment Variables API is currently only accessible at API level. UI support will be released in the near future.

Pasting your credentials into a Pipe Node or .datafile as plain text is a security risk. Instead, use the Environment Variables API to create two new secrets for your postgres username and password. In the next step, you'll then be ready to interpolate your new secrets using the tb_secret function:

{{tb_secret('pg_username')}}
{{tb_secret('pg_password')}}

Load a PostgreSQL table

In the Tinybird UI, create a new Pipe Node. Call the postgresql table function and pass the hostname & port, database, table, user, and password:

Example Node logic with actual values
SELECT *
FROM postgresql(
  'aws-0-eu-central-1.TODO.com:3866',
  'postgres',
  'orders',
  {{tb_secret('pg_username')}},
  {{tb_secret('pg_password')}},
)

Publish this Node as a Copy Pipe, thereby running the query manually. You can choose to append only new data, or replace all data.

Alternative: Use datafiles

As well as using the UI, you can also define Node logic in Pipe .datafile files. An example for an ecommerce orders_backfill scenario, with a Node called all_orders, would be:

NODE all_orders
SQL >

    %
    SELECT *
    FROM postgresql(
      'aws-0-eu-central-1.TODO.com:3866',
      'postgres',
      'orders',
      {{tb_secret('pg_username')}},
      {{tb_secret('pg_password')}},
    )

TYPE copy
TARGET_DATASOURCE orders
COPY_SCHEDULE @on-demand
COPY_MODE replace

Include filters

You can use a source column in postgres and filter by a value in Tinybird, for example:

Example Copy Pipe with postgresql function and filters
SELECT *
FROM postgresql(
  'aws-0-eu-central-1.TODO.com:3866',
  'postgres',
  'orders',
  {{tb_secret('pg_username')}},
  {{tb_secret('pg_password')}},
  )
WHERE orderDate > (select max(orderDate) from orders)

Schedule runs

When publishing as a Copy Pipe, most users set it to run at a frequent interval using a cron expression.

It's also possible to trigger manually:

curl -H "Authorization: Bearer <PIPE:READ token>" \
    -X POST "https:/tinybird.co/api/v0/pipes/<pipe_id>/run"

Having manual Pipes in your Workspace is helpful, as you can run a full sync manually any time you need it - sometimes delta updates are not 100% accurate. Some users also leverage them for weekly full syncs.

Synchronization strategies

When copying data from PostgreSQL to Tinybird you can use one of the following strategies:

  • Use COPY_MODE replace to synchronize small dimensions tables, up to a few million rows, in a frequent schedule (1 to 5 minutes).
  • Use COPY_MODE append to do incremental appends. For example, you can append events data tagged with a timestamp. Combine it with COPY_SCHEDULE and filters in the Copy Pipe SQL to sync the new events.

Timeouts

When synchronizing dimensions tables with COPY_MODE replace and 1 minute schedule, the copy job might timeout because it can't ingest the whole table in the defined schedule.

Timeouts depend on several factors:

  • The statement_timeout configured in PostgreSQL.
  • The PostgreSQL database load.
  • Network connectivity, for example when copying data from different cloud regions.

Follow these steps to avoid timeouts using incremental appends:

  1. Make sure your PostgreSQL dimensions rows are tagged with an updated timestamp.

Use the column to filter the copy Pipe SQL. In the following example, the column is updated_at:

CREATE TABLE users (
    created_at TIMESTAMPTZ(6) NOT NULL,
    updated_at TIMESTAMPTZ(6) NOT NULL,
    name TEXT,
    user_id TEXT PRIMARY KEY
);
  1. Create the target Data Source as a ReplacingMergeTree using a unique or primary key as the ENGINE_SORTING_KEY in the Postgres table. Rows with the same ENGINE_SORTING_KEY are deduplicated.
SCHEMA >
    `created_at` DateTime64(6),
    `updated_at` DateTime64(6),
    `name` String,
    `user_id` String

ENGINE "ReplacingMergeTree"
ENGINE_SORTING_KEY "user_id"
  1. Configure the Copy Pipe with an incremental append strategy and 1 minute schedule. That way you make sure only new records in the last minute are ingested, thus optimizing the copy job duration.
NODE copy_pg_users_rmt_0
SQL >

    %
        SELECT *
        FROM
            postgresql(
                'aws-0-eu-central-1.TODO.com:6543',
                'postgres',
                'users',
                {{ tb_secret('pg_username') }},
                {{ tb_secret('pg_password') }}
            )
        WHERE
            updated_at
            > (SELECT max(updated_at) FROM pg_users_rmt)::String

TYPE copy
TARGET_DATASOURCE pg_users_rmt
COPY_MODE append
COPY_SCHEDULE * * * * *

Optionally, you can create an index in the PostgreSQL table to speed up filtering:

-- Create an index on updated_at for faster queries
CREATE INDEX idx_updated_at ON users (updated_at);
  1. A Data Source with ReplacingMergeTree engine deduplicates records based on the sorting key in batch mode. As you can't ensure when deduplication is going to happen, use the FINAL keyword when querying the Data Source to force deduplication at query time.
SELECT * FROM pg_users FINAL
  1. You can combine this approach with an hourly or daily replacement to get rid of deleted rows. Learn about how to handle deleted rows when using ReplacingMergeTree.

Observability

Job executions are logged in the datasources_ops_log Service Data Source. This log can be checked directly in the Data Source view page in the UI. Filter by datasource_id to monitor ingestion through the PostgreSQL table function from the datasources_ops_log:

Example query to the datasources_ops_log Service Data Source
SELECT
  timestamp,
  event_type,
  result,
  error,
  job_id
FROM
  tinybird.datasources_ops_log
WHERE
  datasource_id = 't_1234'
AND
  event_type = 'copy'
ORDER BY timestamp DESC

Limits

The table function inherits all the limits of Copy Pipes.

Secrets are created at a Workspace level, so you will be able to connect one PostgreSQL database per Tinybird Workspace.

Check the limits page for limits on ingestion, queries, API Endpoints, and more.

Billing

When set up, this functionality is a Copy Pipe with a query (processed data). There are no additional or specific costs for the table function itself. See the billing docs for more information on data operations and how they're charged.

Was this page helpful?
Updated