We use tracking cookies to understand how you use the product and help us improve it. Please accept cookies to help us improve. You can always opt out later via the link in the footer.
kind: destination
spec:
name: "http"
path: "cloudquery/http"
registry: "cloudquery"
version: "v1.0.0"
spec:
base_url: "https://api.example.com"
# Optional: headers to send with each request
# headers:
# Authorization: Bearer <token>
base_url
(string
) (required)https://api.example.com
.headers
(map[string]string
) (optional)headers:
Authorization: Bearer <token>
POST /migrate
- Migrate table message received from the source #POST /migrate
endpoint is a JSON object with the following fields:table_name
- The name of the table.schema
- The schema to write in Arrow IPC format.POST /write
- Write a record message received from the source #POST /write
endpoint is a JSON object with the following fields:table_name
- The name of the table.record
- The record to write in Arrow IPC format.POST /delete_stale
- Delete stale data message received from the source #POST /delete_stale
endpoint is a JSON object with the following fields:table_name
- The name of the table to delete stale data for.source_name
- The name of the source to delete stale data for.sync_time
- The sync time of the current sync, data older than this should be deleted.pip install pyarrow==21.0.0
or via the following requirements.txt
file:pyarrow==21.0.0
from http.server import BaseHTTPRequestHandler, HTTPServer
import logging
import json
import pyarrow as pa
import pyarrow.ipc as ipc
import base64
class RequestHandler(BaseHTTPRequestHandler):
def _set_response(self):
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
def do_POST(self):
if self.path not in ['/write', '/migrate', '/delete_stale']:
self.send_error(404, "Endpoint not found")
return
content_length = int(self.headers['Content-Length']) # Get the size of the data
post_data = self.rfile.read(content_length) # Get the data
try:
json_data = json.loads(post_data)
except json.JSONDecodeError:
self.send_error(400, "Invalid JSON")
return
if self.path == '/write':
table_name = json_data['table_name']
buf = pa.py_buffer(base64.b64decode(json_data['record']))
reader = ipc.open_stream(buf)
record = reader.read_all()
# Do something with the record
# print(table_name, record)
elif self.path == '/migrate':
table_name = json_data['table_name']
buf = pa.py_buffer(base64.b64decode(json_data['schema']))
reader = ipc.open_stream(buf)
# Do something with the schema
# print(table_name, reader.schema)
elif self.path == '/delete_stale':
table_name = json_data['table_name']
source_name = json_data['source_name']
sync_time = json_data['sync_time']
# Do something with the delete stale data
# print(table_name, source_name, sync_time)
# Set response
self._set_response()
response = {
"message": "Request received",
"path": self.path
}
self.wfile.write(json.dumps(response).encode('utf-8'))
def run(server_class=HTTPServer, handler_class=RequestHandler, port=8080):
logging.basicConfig(level=logging.INFO)
server_address = ('', port)
httpd = server_class(server_address, handler_class)
logging.info('Starting server on port %d...\n', port)
httpd.serve_forever()
if __name__ == '__main__':
from sys import argv
if len(argv) == 2:
run(port=int(argv[1]))
else:
run()
python3 example/server.py 8080