Back to destination list
Official
Kafka
This destination plugin lets you sync data from a CloudQuery source to Kafka in various formats such as CSV, JSON. Each table will be pushed to a separate topic
Price
Free
Overview #
Kafka Destination Plugin
This destination plugin lets you sync data from a CloudQuery source to Kafka in various formats such as CSV, JSON. Each table will be pushed to a separate topic.
Example #
This example configures connects to a Kafka destination using SASL plain authentication and pushes messages in JSON format.
The (top level) spec section is described in the Destination Spec Reference.
kind: destination
spec:
name: "kafka"
path: "cloudquery/kafka"
registry: "cloudquery"
version: "v5.4.1"
write_mode: "append"
spec:
# required - list of brokers to connect to
brokers: ["<broker-host>:<broker-port>"]
# optional - if connecting via SASL/PLAIN, the username and password to use. If not set, no authentication will be used.
sasl_username: "${KAFKA_SASL_USERNAME}"
sasl_password: "${KAFKA_SASL_PASSWORD}"
format: "json" # options: parquet, json, csv
format_spec:
# CSV specific parameters:
# delimiter: ","
# skip_header: false
# Parquet specific parameters:
# version: "v2Latest"
# root_repetition: "repeated"
# max_row_group_length: 134217728 # 128 * 1024 * 1024
# Optional parameters
# compression: "" # options: gzip
# verbose: false
# batch_size: 1000
# topic_details:
# num_partitions: 1
# replication_factor: 1
Note that the Kafka plugin only supports
append
write_mode
. The (top level) spec section is described in the Destination Spec Reference.Plugin Spec #
This is the (nested) plugin spec
brokers
([]string
) (required)List of brokers to connect to.Example broker address:"localhost:9092"
default URL for a local Kafka broker
format
(string
) (required)Format of the output file. Supported values arecsv
,json
andparquet
.format_spec
(format_spec) (optional)Optional parameters to change the format of the file.compression
(string
) (optional) (default: empty)Compression algorithm to use. Supported values are empty orgzip
. Not supported forparquet
format.sasl_username
(string
) (optional) (default: empty)If connecting via SASL/PLAIN, the username to use.sasl_password
(string
) (optional) (default: empty)If connecting via SASL/PLAIN, the password to use.enforce_tls_verification
(boolean
) (optional) (default:false
)Iftrue
, the plugin will verify the TLS certificate of the Kafka broker.verbose
(boolean
) (optional) (default:false
)Iftrue
, the plugin will log all underlying Kafka client messages to the log.batch_size
(integer
) (optional) (default:1000
)Number of records to write before starting a new object.topic_details
(topic_details) (optional)Optional parameters to set topic details.
Â
format_spec #
CSV
delimiter
(string
) (optional) (default:,
)Delimiter to use in the CSV file.skip_header
(boolean
) (optional) (default:false
)If set totrue
, the CSV file will not contain a header row as the first row.
JSON
Reserved for future use.
Parquet
version
(string
) (optional) (default:v2Latest
)Parquet format version to use. Supported values arev1.0
,v2.4
,v2.6
andv2Latest
.v2Latest
is an alias for the latest version available in the Parquet library which is currentlyv2.6
.Useful when the reader consuming the Parquet files does not support the latest version.root_repetition
(string
) (optional) (default:repeated
)Repetition option to use for the root node. Supported values areundefined
,required
,optional
andrepeated
.Some Parquet readers require a specific root repetition option to be able to read the file. For example, importing Parquet files into Snowflake requires the root repetition to beundefined
.max_row_group_length
(integer
) (optional) (default:134217728
(= 128 * 1024 * 1024))The maximum number of rows in a single row group. Use a lower number to reduce memory usage when reading the Parquet files, and a higher number to increase the efficiency of reading the Parquet files.
topic_details #
num_partitions
(integer
) (optional) (default:1
)Number of partitions for the newly created topic.replication_factor
(integer
) (optional) (default:1
)Replication factor for the topic.
Confluent Cloud #
Connecting to Confluent Cloud #
Confluent Cloud is a fully managed data streaming platform that you can use as a destination with this plugin. You can get started with a time-limited trial at confluent.io.
To configure CloudQuery Kafka plugin, you need to create an API key in the Confluent Cloud Console.
Download the file with the API key and secret and use them for the
sasl_username
and sasl_password
properties in the Kafka plugin documentation. The file will also contain the URL for the bootstrap server. Use that in the brokers
property in the configuration:kind: destination
spec:
name: "kafka"
path: "cloudquery/kafka"
registry: "cloudquery"
version: "v5.4.1"
write_mode: "append"
spec:
# required - list of brokers to connect to
brokers: ["${CONFLUENT_BOOTSTRAP_SERVER}"]
sasl_username: "${CONFLUENT_KEY}"
sasl_password: "${CONFLUENT_SECRET}"
format: "json" # options: parquet, json, csv
format_spec:
# CSV-specific parameters:
# delimiter: ","
# skip_header: false
# Optional parameters
# compression: "" # options: gzip
# verbose: false
# batch_size: 1000
topic_details:
num_partitions: 1
replication_factor: 1
Creating a scoped key with granular access at Confluent Cloud #
If you need to limit the access of CloudQuery Kafka plugin, you can create an API key with granular access. To do this, you need to set the following ACLs on the API key:
Cluster ACLs
Operation | Permission |
---|---|
CREATE | ALLOW |
DESCRIBE | ALLOW |
Topic ACLs
For topic name, use the prefixes of the tables from the selected source. The table below specifies permissions for topics created by the AWS source plugin:
Topic name | Pattern type | Operation | Permission |
---|---|---|---|
aws_ | PREFIXED | WRITE | ALLOW |
aws_ | PREFIXED | CREATE | ALLOW |
aws_ | PREFIXED | DESCRIBE | ALLOW |
Licenses #
The following tools / packages are used in this plugin:
Name | License |
---|---|
github.com/IBM/sarama | MIT |
github.com/JohnCGriffin/overflow | MIT |
github.com/adrg/xdg | MIT |
github.com/andybalholm/brotli | MIT |
github.com/apache/arrow/go/v13 | Apache-2.0 |
github.com/apache/arrow/go/v17 | Apache-2.0 |
github.com/apache/thrift/lib/go/thrift | Apache-2.0 |
github.com/apapsch/go-jsonmerge/v2 | MIT |
github.com/aws/aws-sdk-go-v2 | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/config | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/credentials | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/feature/ec2/imds | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/internal/configsources | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/internal/ini | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/internal/sync/singleflight | BSD-3-Clause |
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/service/licensemanager | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/service/marketplacemetering | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/service/sso | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/service/ssooidc | Apache-2.0 |
github.com/aws/aws-sdk-go-v2/service/sts | Apache-2.0 |
github.com/aws/smithy-go | Apache-2.0 |
github.com/aws/smithy-go/internal/sync/singleflight | BSD-3-Clause |
github.com/bahlo/generic-list-go | BSD-3-Clause |
github.com/buger/jsonparser | MIT |
github.com/cenkalti/backoff/v4 | MIT |
github.com/cloudquery/cloudquery-api-go | MPL-2.0 |
github.com/cloudquery/codegen/jsonschema | MPL-2.0 |
github.com/cloudquery/plugin-pb-go | MPL-2.0 |
github.com/cloudquery/plugin-sdk/v2/internal/glob | MIT |
github.com/cloudquery/plugin-sdk/v2/schema | MIT |
github.com/cloudquery/plugin-sdk/v2/types | MPL-2.0 |
github.com/cloudquery/plugin-sdk/v4 | MPL-2.0 |
github.com/cloudquery/plugin-sdk/v4/glob | MIT |
github.com/cloudquery/plugin-sdk/v4/scalar | MIT |
github.com/davecgh/go-spew/spew | ISC |
github.com/eapache/go-resiliency/breaker | MIT |
github.com/eapache/go-xerial-snappy | MIT |
github.com/eapache/queue | MIT |
github.com/ghodss/yaml | MIT |
github.com/go-logr/logr | Apache-2.0 |
github.com/go-logr/stdr | Apache-2.0 |
github.com/goccy/go-json | MIT |
github.com/golang/snappy | BSD-3-Clause |
github.com/google/flatbuffers/go | Apache-2.0 |
github.com/google/uuid | BSD-3-Clause |
github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors | Apache-2.0 |
github.com/grpc-ecosystem/grpc-gateway/v2 | BSD-3-Clause |
github.com/hashicorp/errwrap | MPL-2.0 |
github.com/hashicorp/go-cleanhttp | MPL-2.0 |
github.com/hashicorp/go-multierror | MPL-2.0 |
github.com/hashicorp/go-retryablehttp | MPL-2.0 |
github.com/hashicorp/go-uuid | MPL-2.0 |
github.com/invopop/jsonschema | MIT |
github.com/jcmturner/aescts/v2 | Apache-2.0 |
github.com/jcmturner/dnsutils/v2 | Apache-2.0 |
github.com/jcmturner/gofork | BSD-3-Clause |
github.com/jcmturner/gokrb5/v8 | Apache-2.0 |
github.com/jcmturner/rpc/v2 | Apache-2.0 |
github.com/klauspost/compress | Apache-2.0 |
github.com/klauspost/compress/internal/snapref | BSD-3-Clause |
github.com/klauspost/compress/zstd/internal/xxhash | MIT |
github.com/klauspost/cpuid/v2 | MIT |
github.com/mailru/easyjson | MIT |
github.com/mattn/go-colorable | MIT |
github.com/mattn/go-isatty | MIT |
github.com/oapi-codegen/runtime | Apache-2.0 |
github.com/pierrec/lz4/v4 | BSD-3-Clause |
github.com/pmezard/go-difflib/difflib | BSD-3-Clause |
github.com/rcrowley/go-metrics | BSD-2-Clause-FreeBSD |
github.com/rs/zerolog | MIT |
github.com/santhosh-tekuri/jsonschema/v6 | Apache-2.0 |
github.com/spf13/cobra | Apache-2.0 |
github.com/spf13/pflag | BSD-3-Clause |
github.com/stretchr/testify | MIT |
github.com/thoas/go-funk | MIT |
github.com/wk8/go-ordered-map/v2 | Apache-2.0 |
github.com/zeebo/xxh3 | BSD-2-Clause |
go.opentelemetry.io/otel | Apache-2.0 |
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp | Apache-2.0 |
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp | Apache-2.0 |
go.opentelemetry.io/otel/exporters/otlp/otlptrace | Apache-2.0 |
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp | Apache-2.0 |
go.opentelemetry.io/otel/log | Apache-2.0 |
go.opentelemetry.io/otel/metric | Apache-2.0 |
go.opentelemetry.io/otel/sdk | Apache-2.0 |
go.opentelemetry.io/otel/sdk/log | Apache-2.0 |
go.opentelemetry.io/otel/sdk/metric | Apache-2.0 |
go.opentelemetry.io/otel/trace | Apache-2.0 |
go.opentelemetry.io/proto/otlp | Apache-2.0 |
golang.org/x/crypto | BSD-3-Clause |
golang.org/x/exp | BSD-3-Clause |
golang.org/x/net | BSD-3-Clause |
golang.org/x/sync/errgroup | BSD-3-Clause |
golang.org/x/sys | BSD-3-Clause |
golang.org/x/text | BSD-3-Clause |
golang.org/x/xerrors | BSD-3-Clause |
google.golang.org/genproto/googleapis/api/httpbody | Apache-2.0 |
google.golang.org/genproto/googleapis/rpc/status | Apache-2.0 |
google.golang.org/grpc | Apache-2.0 |
google.golang.org/protobuf | BSD-3-Clause |
gopkg.in/yaml.v2 | Apache-2.0 |
gopkg.in/yaml.v3 | MIT |