Back to plugin list
kafka
Official

Kafka

This destination plugin lets you sync data from a CloudQuery source to Kafka in various formats such as CSV, JSON. Each table will be pushed to a separate topic

Publisher

cloudquery

Repositorygithub.com
Latest version

v5.1.1

Type

Destination

Platforms
Date Published

Price

Free

Overview #

Kafka Destination Plugin

This destination plugin lets you sync data from a CloudQuery source to Kafka in various formats such as CSV, JSON. Each table will be pushed to a separate topic.

Example #

This example configures connects to a Kafka destination using SASL plain authentication and pushes messages in JSON format.
The (top level) spec section is described in the Destination Spec Reference.
kind: destination
spec:
  name: "kafka"
  path: "cloudquery/kafka"
  registry: "cloudquery"
  version: "v5.1.1"
  write_mode: "append"
  spec:
    # required - list of brokers to connect to
    brokers: ["<broker-host>:<broker-port>"]
    # optional - if connecting via SASL/PLAIN, the username and password to use. If not set, no authentication will be used.
    sasl_username: "${KAFKA_SASL_USERNAME}"
    sasl_password: "${KAFKA_SASL_PASSWORD}"
    format: "json" # options: parquet, json, csv
    format_spec:
      # CSV specific parameters:
      # delimiter: ","
      # skip_header: false
      # Parquet specific parameters:
      # version: "v2Latest"
      # root_repetition: "repeated"

    # Optional parameters
    # compression: "" # options: gzip
    # verbose: false
    # batch_size: 1000
    # topic_details:
      # num_partitions: 1
      # replication_factor: 1
Note that the Kafka plugin only supports append write_mode. The (top level) spec section is described in the Destination Spec Reference.

Plugin Spec #

This is the (nested) plugin spec
  • brokers ([]string) (required)
    List of brokers to connect to.
    Example broker address:
    • "localhost:9092" default URL for a local Kafka broker
  • format (string) (required)
    Format of the output file. Supported values are csv, json and parquet.
  • format_spec (format_spec) (optional)
    Optional parameters to change the format of the file.
  • compression (string) (optional) (default: empty)
    Compression algorithm to use. Supported values are empty or gzip. Not supported for parquet format.
  • sasl_username (string) (optional) (default: empty)
    If connecting via SASL/PLAIN, the username to use.
  • sasl_password (string) (optional) (default: empty)
    If connecting via SASL/PLAIN, the password to use.
  • verbose (boolean) (optional) (default: false)
    If true, the plugin will log all underlying Kafka client messages to the log.
  • batch_size (integer) (optional) (default: 1000)
    Number of records to write before starting a new object.
  • topic_details (topic_details) (optional)
    Optional parameters to set topic details.
 

format_spec #

CSV
  • delimiter (string) (optional) (default: ,)
    Delimiter to use in the CSV file.
  • skip_header (boolean) (optional) (default: false)
    If set to true, the CSV file will not contain a header row as the first row.
JSON
Reserved for future use.
Parquet
  • version (string) (optional) (default: v2Latest)
    Parquet format version to use. Supported values are v1.0, v2.4, v2.6 and v2Latest. v2Latest is an alias for the latest version available in the Parquet library which is currently v2.6.
    Useful when the reader consuming the Parquet files does not support the latest version.
  • root_repetition (string) (optional) (default: repeated)
    Repetition option to use for the root node. Supported values are undefined, required, optional and repeated.
    Some Parquet readers require a specific root repetition option to be able to read the file. For example, importing Parquet files into Snowflake requires the root repetition to be undefined.
 

topic_details #

  • num_partitions (integer) (optional) (default: 1)
    Number of partitions for the newly created topic.
  • replication_factor (integer) (optional) (default: 1)
    Replication factor for the topic.


Confluent Cloud #

Connecting to Confluent Cloud #

Confluent Cloud is a fully managed data streaming platform that you can use as a destination with this plugin. You can get started with a time-limited trial at confluent.io.
To configure CloudQuery Kafka plugin, you need to create an API key in the Confluent Cloud Console.
Download the file with the API key and secret and use them for the sasl_username and sasl_password properties in the Kafka plugin documentation. The file will also contain the URL for the bootstrap server. Use that in the brokers property in the configuration:
kind: destination
spec:
  name: "kafka"
  path: "cloudquery/kafka"
  registry: "cloudquery"
  version: "v5.1.1"
  write_mode: "append"
  spec:
    # required - list of brokers to connect to
    brokers: ["${CONFLUENT_BOOTSTRAP_SERVER}"]
    sasl_username: "${CONFLUENT_KEY}"
    sasl_password: "${CONFLUENT_SECRET}"
    format: "json" # options: parquet, json, csv
    format_spec:
      # CSV-specific parameters:
      # delimiter: ","
      # skip_header: false

    # Optional parameters
    # compression: "" # options: gzip
    # verbose: false
    # batch_size: 1000
    topic_details:
      num_partitions: 1
      replication_factor: 1

Creating a scoped key with granular access at Confluent Cloud #

If you need to limit the access of CloudQuery Kafka plugin, you can create an API key with granular access. To do this, you need to set the following ACLs on the API key:
Cluster ACLs
OperationPermission
CREATEALLOW
DESCRIBEALLOW
Topic ACLs
For topic name, use the prefixes of the tables from the selected source. The table below specifies permissions for topics created by the AWS source plugin:
Topic namePattern typeOperationPermission
aws_PREFIXEDWRITEALLOW
aws_PREFIXEDCREATEALLOW
aws_PREFIXEDDESCRIBEALLOW