Uploading data

Create a new dataset

import redivis

# Could also create a dataset under an organization:
# dataset = redivis.organization("Demo organization").dataset("some dataset")
dataset = redivis.user("your-username").dataset("some dataset")

# public_access_level can be one of ('none', 'overview', 'metadata', 'sample', 'data')
dataset.create(public_access_level="overview")

Create a table and upload data

import redivis

dataset = redivis.user("user_name").dataset("dataset_name", version="next")

# Create a table on the dataset. Datasets may have multiple tables
table = (
    dataset
    .table("Table name")
    .create(description="Some description")
)

# Upload a file to the table. 
# You can create multiple uploads per table, in which case they'll be appended together.
upload = table.upload("data.csv")

with open("data.csv", "rb") as file:
    upload.create(
        file, 
        type="delimited",
        remove_on_fail=True,    # Remove the upload if a failure occurs
        wait_for_finish=True,   # Wait for the upload to finish processing
        raise_on_fail=True      # Raise an error on failure
    )

Upload non-tabular files

import redivis
dataset = redivis.user("user_name").dataset("dataset_name", version="next")

# Non-tabular files must be uploaded to file index tables
table = dataset.table("my_files").create(is_file_index=True)

# upload all contents in a directory
table.add_files(directory="/path/to/directory/")

# upload specific files
table.add_files(files=list(
    {"path": "/path/to/file.png"}, # file name will be "file.png"
    {"path": "/path/to/other/file.png", "name": "other_file.png"}, # file name will be other_file.png
    {"data": "Hello world", "name": "hello_world.txt"} # Data can be string or bytes 
)

Upload data from an external source

# Assuming we get a reference to the table the same as above...

upload = table.upload("data.csv")

upload.create(
    transfer_specification={
        "sourceType": "gcs", # one of gcs, s3, bigQuery, url, redivis
        "sourcePath": "my-bucket/path-to-my-file.csv", 
        # "sourcePath": "https://example.com/data-file", (for sourceType == "url")
        # "sourcePath": "workflow_name.dataset_name.table_name", (for sourceType == "bigQuery")
        # "sourcePath": "owner_name.dataset_or_workflow_name.table_name", (for sourceType == "redivis")
        "identity": "my_email@example.com" # The email associated with the data source
    },
)

Stream data to an upload

import redivis

dataset = redivis.user("user_name").dataset("dataset_name", version="next")
table = dataset.table("table_name")

# schema is optional if update_schema is set to True on the insert_rows request
schema = [
    { "name": "var1", "type": "string" }, 
    { "name": "var2", "type": "integer" },
    { "name": "var3", "type": "dateTime" }
]

rows = [
    { "var1": "hello", "var2": 1, "var3": None },
    # dateTime must be in the format YYYY-MM-DD[ |T]HH:MM:SS[.ssssss]
    { "var1": "world", "var2": 2, "var3": "2020-01-01T00:00:00.123" }
]

# Reference each upload with its name, which must be unique amongst other uploads
#   for the current version of this table.
upload = table.upload(name="some_streamed_data")

# Only call create if the upload doesn't already exist
upload.create(
    type="stream", 
    # schema is optional if update_schema is set to True on insert_rows
    schema=schema,
    # If True, will only create the upload if an upload with this name doesn't already exist
    # Otherwise, a counter will be added to the name to preserve name uniqueness          
    if_not_exists=False,
    # If skip_bad_records is True, ignore records that are incompatible with the existing schema. 
    # This has no effect when update_schema is set to True on the insert_rows request.  
    skip_bad_records=False # Optional, default is False
) 

insert_response = upload.insert_rows(
    rows, 
    # If update_schema is set to True, variables can be added by subsequent streams,
    #    and variable types will be relaxed if new values are incompatible with the previous type.
    # If False, an error will be thrown if a row would cause a schema update, 
    #    unless skip_bad_records is set to True on the upload (in which case they'll be ignored)
    update_schema=False,
)

# See REST API / uploads / insertRows
print(insert_response)

Release a new version

import redivis

dataset = redivis.organization("Demo").dataset("some dataset")
dataset.release()

Create a subsequent version on an existing dataset

import redivis

dataset = redivis.user("your-username").dataset("some dataset")

# dataset.create_next_version will throw an error if a "next" version already exists,
# unless the ignore_if_exists argument is provided
dataset = dataset.create_next_version(ignore_if_exists=True)
table = dataset.table("table name")

# By default, all new data is appended to the previous version of a table. 
# If you'd like to replace the previous data, update the upload_merge_strategy.
table.update(upload_merge_strategy="replace")

upload = table.upload("data.csv")
with open("data.csv", "rb") as file:
    upload.create(
        file,
        # All additional params are optional; default values are shown here
        type="delimited",            # One of stream, delimited, csv, ndjson, avro, parquet, orc, xls, xlsx, dta, sas7bdat, sav
        skip_bad_records=False,      
        has_header_row=True,         # Only relevant for csv, xls(x)
        remove_on_fail=True,    # Remove the upload if a failure occurs
        wait_for_finish=True,   # Wait for the upload to finish processing
        raise_on_fail=True      # Raise an error on failure
        
        # The following are only relevant for delimited files:
        allow_quoted_newlines=False, # Allow newlines within cells. Setting to True will substantially reduce ingest performance.
        quote_character='"',         # The character used to escape delimiters within cells. Generally a double quote in compliant CSVs.
        delimiter=None,              # For delimited files, explicitly set the delimiter, otherwise the delimiter will be automatically inferred.
    )

# When all uploads have finished, release the next version
dataset.release()

Last updated