New
Join our webinar! Building a customizable and extensible cloud asset inventory at scale
Back to destination list
kafka
Official

Kafka

This destination plugin lets you sync data from a CloudQuery source to Kafka in various formats such as CSV, JSON. Each table will be pushed to a separate topic

Publisher

cloudquery

Repositorygithub.com
Latest version

v5.4.2

Type

Destination

Platforms
Date Published

Price

Free

Overview #

Kafka Destination Plugin

This destination plugin lets you sync data from a CloudQuery source to Kafka in various formats such as CSV, JSON. Each table will be pushed to a separate topic.

Example #

This example configures connects to a Kafka destination using SASL plain authentication and pushes messages in JSON format.
The (top level) spec section is described in the Destination Spec Reference.
kind: destination
spec:
  name: "kafka"
  path: "cloudquery/kafka"
  registry: "cloudquery"
  version: "v5.4.2"
  write_mode: "append"
  spec:
    # required - list of brokers to connect to
    brokers: ["<broker-host>:<broker-port>"]
    # optional - if connecting via SASL/PLAIN, the username and password to use. If not set, no authentication will be used.
    sasl_username: "${KAFKA_SASL_USERNAME}"
    sasl_password: "${KAFKA_SASL_PASSWORD}"
    format: "json" # options: parquet, json, csv
    format_spec:
      # CSV specific parameters:
      # delimiter: ","
      # skip_header: false
      # Parquet specific parameters:
      # version: "v2Latest"
      # root_repetition: "repeated"
      # max_row_group_length: 134217728 # 128 * 1024 * 1024

    # Optional parameters
    # compression: "" # options: gzip
    # verbose: false
    # batch_size: 1000
    # topic_details:
      # num_partitions: 1
      # replication_factor: 1
Note that the Kafka plugin only supports append write_mode. The (top level) spec section is described in the Destination Spec Reference.

Plugin Spec #

This is the (nested) plugin spec
  • brokers ([]string) (required)
    List of brokers to connect to.
    Example broker address:
    • "localhost:9092" default URL for a local Kafka broker
  • format (string) (required)
    Format of the output file. Supported values are csv, json and parquet.
  • format_spec (format_spec) (optional)
    Optional parameters to change the format of the file.
  • compression (string) (optional) (default: empty)
    Compression algorithm to use. Supported values are empty or gzip. Not supported for parquet format.
  • sasl_username (string) (optional) (default: empty)
    If connecting via SASL/PLAIN, the username to use.
  • sasl_password (string) (optional) (default: empty)
    If connecting via SASL/PLAIN, the password to use.
  • enforce_tls_verification (boolean) (optional) (default: false)
    If true, the plugin will verify the TLS certificate of the Kafka broker.
  • verbose (boolean) (optional) (default: false)
    If true, the plugin will log all underlying Kafka client messages to the log.
  • batch_size (integer) (optional) (default: 1000)
    Number of records to write before starting a new object.
  • topic_details (topic_details) (optional)
    Optional parameters to set topic details.
 

format_spec #

CSV
  • delimiter (string) (optional) (default: ,)
    Delimiter to use in the CSV file.
  • skip_header (boolean) (optional) (default: false)
    If set to true, the CSV file will not contain a header row as the first row.
JSON
Reserved for future use.
Parquet
  • version (string) (optional) (default: v2Latest)
    Parquet format version to use. Supported values are v1.0, v2.4, v2.6 and v2Latest. v2Latest is an alias for the latest version available in the Parquet library which is currently v2.6.
    Useful when the reader consuming the Parquet files does not support the latest version.
  • root_repetition (string) (optional) (default: repeated)
    Repetition option to use for the root node. Supported values are undefined, required, optional and repeated.
    Some Parquet readers require a specific root repetition option to be able to read the file. For example, importing Parquet files into Snowflake requires the root repetition to be undefined.
  • max_row_group_length (integer) (optional) (default: 134217728 (= 128 * 1024 * 1024))
    The maximum number of rows in a single row group. Use a lower number to reduce memory usage when reading the Parquet files, and a higher number to increase the efficiency of reading the Parquet files.

topic_details #

  • num_partitions (integer) (optional) (default: 1)
    Number of partitions for the newly created topic.
  • replication_factor (integer) (optional) (default: 1)
    Replication factor for the topic.


Confluent Cloud #

Connecting to Confluent Cloud #

Confluent Cloud is a fully managed data streaming platform that you can use as a destination with this plugin. You can get started with a time-limited trial at confluent.io.
To configure CloudQuery Kafka plugin, you need to create an API key in the Confluent Cloud Console.
Download the file with the API key and secret and use them for the sasl_username and sasl_password properties in the Kafka plugin documentation. The file will also contain the URL for the bootstrap server. Use that in the brokers property in the configuration:
kind: destination
spec:
  name: "kafka"
  path: "cloudquery/kafka"
  registry: "cloudquery"
  version: "v5.4.2"
  write_mode: "append"
  spec:
    # required - list of brokers to connect to
    brokers: ["${CONFLUENT_BOOTSTRAP_SERVER}"]
    sasl_username: "${CONFLUENT_KEY}"
    sasl_password: "${CONFLUENT_SECRET}"
    format: "json" # options: parquet, json, csv
    format_spec:
      # CSV-specific parameters:
      # delimiter: ","
      # skip_header: false

    # Optional parameters
    # compression: "" # options: gzip
    # verbose: false
    # batch_size: 1000
    topic_details:
      num_partitions: 1
      replication_factor: 1

Creating a scoped key with granular access at Confluent Cloud #

If you need to limit the access of CloudQuery Kafka plugin, you can create an API key with granular access. To do this, you need to set the following ACLs on the API key:
Cluster ACLs
OperationPermission
CREATEALLOW
DESCRIBEALLOW
Topic ACLs
For topic name, use the prefixes of the tables from the selected source. The table below specifies permissions for topics created by the AWS source plugin:
Topic namePattern typeOperationPermission
aws_PREFIXEDWRITEALLOW
aws_PREFIXEDCREATEALLOW
aws_PREFIXEDDESCRIBEALLOW


Licenses #

The following tools / packages are used in this plugin:
NameLicense
github.com/IBM/saramaMIT
github.com/JohnCGriffin/overflowMIT
github.com/adrg/xdgMIT
github.com/andybalholm/brotliMIT
github.com/apache/arrow/go/v13Apache-2.0
github.com/apache/arrow-go/v18Apache-2.0
github.com/apache/thrift/lib/go/thriftApache-2.0
github.com/apapsch/go-jsonmerge/v2MIT
github.com/aws/aws-sdk-go-v2Apache-2.0
github.com/aws/aws-sdk-go-v2/configApache-2.0
github.com/aws/aws-sdk-go-v2/credentialsApache-2.0
github.com/aws/aws-sdk-go-v2/feature/ec2/imdsApache-2.0
github.com/aws/aws-sdk-go-v2/internal/configsourcesApache-2.0
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2Apache-2.0
github.com/aws/aws-sdk-go-v2/internal/iniApache-2.0
github.com/aws/aws-sdk-go-v2/internal/sync/singleflightBSD-3-Clause
github.com/aws/aws-sdk-go-v2/service/internal/accept-encodingApache-2.0
github.com/aws/aws-sdk-go-v2/service/internal/presigned-urlApache-2.0
github.com/aws/aws-sdk-go-v2/service/licensemanagerApache-2.0
github.com/aws/aws-sdk-go-v2/service/marketplacemeteringApache-2.0
github.com/aws/aws-sdk-go-v2/service/ssoApache-2.0
github.com/aws/aws-sdk-go-v2/service/ssooidcApache-2.0
github.com/aws/aws-sdk-go-v2/service/stsApache-2.0
github.com/aws/smithy-goApache-2.0
github.com/aws/smithy-go/internal/sync/singleflightBSD-3-Clause
github.com/bahlo/generic-list-goBSD-3-Clause
github.com/buger/jsonparserMIT
github.com/cenkalti/backoff/v4MIT
github.com/cloudquery/cloudquery-api-goMPL-2.0
github.com/cloudquery/codegen/jsonschemaMPL-2.0
github.com/cloudquery/plugin-pb-goMPL-2.0
github.com/cloudquery/plugin-sdk/v2/internal/globMIT
github.com/cloudquery/plugin-sdk/v2/schemaMIT
github.com/cloudquery/plugin-sdk/v2/typesMPL-2.0
github.com/cloudquery/plugin-sdk/v4MPL-2.0
github.com/cloudquery/plugin-sdk/v4/globMIT
github.com/cloudquery/plugin-sdk/v4/scalarMIT
github.com/davecgh/go-spew/spewISC
github.com/eapache/go-resiliency/breakerMIT
github.com/eapache/go-xerial-snappyMIT
github.com/eapache/queueMIT
github.com/ghodss/yamlMIT
github.com/go-logr/logrApache-2.0
github.com/go-logr/stdrApache-2.0
github.com/goccy/go-jsonMIT
github.com/golang/snappyBSD-3-Clause
github.com/google/flatbuffers/goApache-2.0
github.com/google/uuidBSD-3-Clause
github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptorsApache-2.0
github.com/grpc-ecosystem/grpc-gateway/v2BSD-3-Clause
github.com/hashicorp/errwrapMPL-2.0
github.com/hashicorp/go-cleanhttpMPL-2.0
github.com/hashicorp/go-multierrorMPL-2.0
github.com/hashicorp/go-retryablehttpMPL-2.0
github.com/hashicorp/go-uuidMPL-2.0
github.com/invopop/jsonschemaMIT
github.com/jcmturner/aescts/v2Apache-2.0
github.com/jcmturner/dnsutils/v2Apache-2.0
github.com/jcmturner/goforkBSD-3-Clause
github.com/jcmturner/gokrb5/v8Apache-2.0
github.com/jcmturner/rpc/v2Apache-2.0
github.com/klauspost/compressApache-2.0
github.com/klauspost/compress/internal/snaprefBSD-3-Clause
github.com/klauspost/compress/zstd/internal/xxhashMIT
github.com/klauspost/cpuid/v2MIT
github.com/mailru/easyjsonMIT
github.com/mattn/go-colorableMIT
github.com/mattn/go-isattyMIT
github.com/oapi-codegen/runtimeApache-2.0
github.com/pierrec/lz4/v4BSD-3-Clause
github.com/pmezard/go-difflib/difflibBSD-3-Clause
github.com/rcrowley/go-metricsBSD-2-Clause-FreeBSD
github.com/rs/zerologMIT
github.com/santhosh-tekuri/jsonschema/v6Apache-2.0
github.com/spf13/cobraApache-2.0
github.com/spf13/pflagBSD-3-Clause
github.com/stretchr/testifyMIT
github.com/thoas/go-funkMIT
github.com/wk8/go-ordered-map/v2Apache-2.0
github.com/zeebo/xxh3BSD-2-Clause
go.opentelemetry.io/otelApache-2.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttpApache-2.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttpApache-2.0
go.opentelemetry.io/otel/exporters/otlp/otlptraceApache-2.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttpApache-2.0
go.opentelemetry.io/otel/logApache-2.0
go.opentelemetry.io/otel/metricApache-2.0
go.opentelemetry.io/otel/sdkApache-2.0
go.opentelemetry.io/otel/sdk/logApache-2.0
go.opentelemetry.io/otel/sdk/metricApache-2.0
go.opentelemetry.io/otel/traceApache-2.0
go.opentelemetry.io/proto/otlpApache-2.0
golang.org/x/cryptoBSD-3-Clause
golang.org/x/expBSD-3-Clause
golang.org/x/netBSD-3-Clause
golang.org/x/sync/errgroupBSD-3-Clause
golang.org/x/sysBSD-3-Clause
golang.org/x/textBSD-3-Clause
golang.org/x/xerrorsBSD-3-Clause
google.golang.org/genproto/googleapis/api/httpbodyApache-2.0
google.golang.org/genproto/googleapis/rpc/statusApache-2.0
google.golang.org/grpcApache-2.0
google.golang.org/protobufBSD-3-Clause
gopkg.in/yaml.v2Apache-2.0
gopkg.in/yaml.v3MIT


Join our mailing list

Subscribe to our newsletter to make sure you don't miss any updates.

Legal

© 2024 CloudQuery, Inc. All rights reserved.

We use tracking cookies to understand how you use the product and help us improve it. Please accept cookies to help us improve. You can always opt out later via the link in the footer.