Uploading data
Create a new dataset
import redivis
# Could also create a dataset under an organization:
# dataset = redivis.organization("Demo organization").dataset("some dataset")
dataset = redivis.user("your-username").dataset("some dataset")
# public_access_level can be one of ('none', 'overview', 'metadata', 'sample', 'data')
dataset.create(public_access_level="overview")
Create a table and upload data
import redivis
dataset = redivis.user("user_name").dataset("dataset_name", version="next")
# Create a table on the dataset. Datasets may have multiple tables
table = (
dataset
.table("Table name")
.create(description="Some description")
)
# Upload a file to the table.
# You can create multiple uploads per table, in which case they'll be appended together.
upload = table.upload("data.csv")
with open("data.csv", "rb") as file:
upload.create(
file,
type="delimited",
remove_on_fail=True, # Remove the upload if a failure occurs
wait_for_finish=True, # Wait for the upload to finish processing
raise_on_fail=True # Raise an error on failure
)
Upload non-tabular files
import redivis
dataset = redivis.user("user_name").dataset("dataset_name", version="next")
# Non-tabular files must be uploaded to file index tables
table = dataset.table("my_files").create(is_file_index=True)
# upload all contents in a directory
table.add_files(directory="/path/to/directory/")
# upload specific files
table.add_files(files=list(
{"path": "/path/to/file.png"}, # file name will be "file.png"
{"path": "/path/to/other/file.png", "name": "other_file.png"}, # file name will be other_file.png
{"data": "Hello world", "name": "hello_world.txt"} # Data can be string or bytes
)
Upload data from an external source
# Assuming we get a reference to the table the same as above...
upload = table.upload("data.csv")
upload.create(
transfer_specification={
"sourceType": "gcs", # one of gcs, s3, bigQuery, url, redivis
"sourcePath": "my-bucket/path-to-my-file.csv",
# "sourcePath": "https://example.com/data-file", (for sourceType == "url")
# "sourcePath": "workflow_name.dataset_name.table_name", (for sourceType == "bigQuery")
# "sourcePath": "owner_name.dataset_or_workflow_name.table_name", (for sourceType == "redivis")
"identity": "my_email@example.com" # The email associated with the data source
},
)
Stream data to an upload
import redivis
dataset = redivis.user("user_name").dataset("dataset_name", version="next")
table = dataset.table("table_name")
# schema is optional if update_schema is set to True on the insert_rows request
schema = [
{ "name": "var1", "type": "string" },
{ "name": "var2", "type": "integer" },
{ "name": "var3", "type": "dateTime" }
]
rows = [
{ "var1": "hello", "var2": 1, "var3": None },
# dateTime must be in the format YYYY-MM-DD[ |T]HH:MM:SS[.ssssss]
{ "var1": "world", "var2": 2, "var3": "2020-01-01T00:00:00.123" }
]
# Reference each upload with its name, which must be unique amongst other uploads
# for the current version of this table.
upload = table.upload(name="some_streamed_data")
# Only call create if the upload doesn't already exist
upload.create(
type="stream",
# schema is optional if update_schema is set to True on insert_rows
schema=schema,
# If True, will only create the upload if an upload with this name doesn't already exist
# Otherwise, a counter will be added to the name to preserve name uniqueness
if_not_exists=False,
# If skip_bad_records is True, ignore records that are incompatible with the existing schema.
# This has no effect when update_schema is set to True on the insert_rows request.
skip_bad_records=False # Optional, default is False
)
insert_response = upload.insert_rows(
rows,
# If update_schema is set to True, variables can be added by subsequent streams,
# and variable types will be relaxed if new values are incompatible with the previous type.
# If False, an error will be thrown if a row would cause a schema update,
# unless skip_bad_records is set to True on the upload (in which case they'll be ignored)
update_schema=False,
)
# See REST API / uploads / insertRows
print(insert_response)
Release a new version
import redivis
dataset = redivis.organization("Demo").dataset("some dataset")
dataset.release()
Create a subsequent version on an existing dataset
import redivis
dataset = redivis.user("your-username").dataset("some dataset")
# dataset.create_next_version will throw an error if a "next" version already exists,
# unless the ignore_if_exists argument is provided
dataset = dataset.create_next_version(ignore_if_exists=True)
table = dataset.table("table name")
# By default, all new data is appended to the previous version of a table.
# If you'd like to replace the previous data, update the upload_merge_strategy.
table.update(upload_merge_strategy="replace")
upload = table.upload("data.csv")
with open("data.csv", "rb") as file:
upload.create(
file,
# All additional params are optional; default values are shown here
type="delimited", # One of stream, delimited, csv, ndjson, avro, parquet, orc, xls, xlsx, dta, sas7bdat, sav
skip_bad_records=False,
has_header_row=True, # Only relevant for csv, xls(x)
remove_on_fail=True, # Remove the upload if a failure occurs
wait_for_finish=True, # Wait for the upload to finish processing
raise_on_fail=True # Raise an error on failure
# The following are only relevant for delimited files:
allow_quoted_newlines=False, # Allow newlines within cells. Setting to True will substantially reduce ingest performance.
quote_character='"', # The character used to escape delimiters within cells. Generally a double quote in compliant CSVs.
delimiter=None, # For delimited files, explicitly set the delimiter, otherwise the delimiter will be automatically inferred.
)
# When all uploads have finished, release the next version
dataset.release()
Last updated