Links
Comment on page

Examples

Reading table data

table = (
redivis
.organization("Demo")
.dataset("CMS 2014 Medicare Data")
.table("Hospice providers")
)
# We can specify an optional max_results argument to only load some of the records
for row in table.list_rows(max_results=100):
print(row)
# Max results can also be the first positional argument
df = table.to_dataframe(100)
print(df)

Querying data

Execute a query

import redivis
# Perform a query on the Demo CMS Medicare data. Table at https://redivis.com/demo/datasets/1754/tables
query = redivis.query("""
SELECT * FROM demo.cms_2014_medicare_data.home_health_agencies
WHERE state = 'CA'
""")
for row in query.list_rows():
print(row["agency_name"])
# We can also use data frames
df = query.to_dataframe();
print(df)

Execute a scoped query

import redivis
# Perform a query on the Demo CMS Medicare data.
# Table at https://redivis.com/demo/datasets/1754/tablesd
# We don't need to include fully-qualified table names
# if we scope our query to the appropriate dataset or project
query = (
redivis
.organization("Demo")
.dataset("CMS 2014 Medicare Data")
.query("""
SELECT provider_name, average_total_payments
FROM nursing_facilities
INNER JOIN outpatient_charges USING (provider_id)
WHERE state = 'CA'
""")
)
for row in query.list_rows():
print(row.agency_name)
# We can also use data frames
df = query.to_dataframe();
print(df)

Working with files

Load a single file

import redivis
file = redivis.file("rnwk-acs3famee.pVr4Gzq54L3S9pblMZTs5Q")
# Download the file
download_location = file.download("./my-downloads")
f = open(download_location, "r")
# Read the file
file_content = file.read(as_text=True)
print(file_content)
# Stream the file
with TextIOWrapper(file.stream()) as f:
f.readline() # read first line

Load all files in a table

import redivis
table = redivis.user("username").dataset("dataset_name").table("table_name")
# Download all files
table.download_files("./table_files/")
# Read all files into memory — only do this for a few small files!
files = table.list_files()
file_contents = [
file.read(as_text=True)
for file in files
]

Uploading data

Create a new dataset

import redivis
# Could also create a dataset under an organization:
# dataset = redivis.organization("Demo organization").dataset("some dataset")
dataset = redivis.user("your-username").dataset("some dataset")
# public_access_level can be one of ('none', 'overview', 'metadata', 'sample', 'data')
dataset.create(public_access_level="overview")

Create a table and upload data

import redivis
dataset = redivis.user("user_name").dataset("dataset_name", version="next")
# Create a table on the dataset. Datasets may have multiple tables
table = (
dataset
.table("Table name")
.create(description="Some description")
)
# Upload a file to the table.
# You can create multiple uploads per table, in which case they'll be appended together.
upload = table.upload("data.csv")
with open("data.csv", "rb") as file:
upload.create(
file,
type="delmited",
remove_on_fail=True, # Remove the upload if a failure occurs
wait_for_finish=True, # Wait for the upload to finish processing
raise_on_fail=True # Raise an error on failure
)

Upload data from external source

# Assuming we get a reference to table the same as above...
upload = table.upload("data.csv")
upload.create(
transfer_specification={
"sourceType": "gcs", # one of gcs, s3, bigQuery, url, redivis
"sourcePath": "my-bucket/path-to-my-file.csv",
# "sourcePath": "https://example.com/data-file", (for sourceType == "url")
# "sourcePath": "project_name.dataset_name.table_name", (for sourceType == "bigQuery")
# "sourcePath": "owner_name.dataset_or_project_name.table_name", (for sourceType == "redivis")
"identity": "[email protected]" # The email associated with the data source
},
)

Stream data to an upload

import redivis
dataset = redivis.user("user_name").dataset("dataset_name", version="next")
table = dataset.table("table_name")
# schema is optional if update_schema is set to True on the insert_rows request
schema = [
{ "name": "var1", "type": "string" },
{ "name": "var2", "type": "integer" },
{ "name": "var3", "type": "dateTime" }
]
rows = [
{ "var1": "hello", "var2": 1, "var3": None },
# dateTime must be in the format YYYY-MM-DD[ |T]HH:MM:SS[.ssssss]
{ "var1": "world", "var2": 2, "var3": "2020-01-01T00:00:00.123" }
]
# Reference each upload with its name, which must be unique amongst other uploads
# for the current version of this table.
upload = table.upload(name="some_streamed_data")
# Only call create if the upload doesn't already exist
upload.create(
type="stream",
# schema is optional if update_schema is set to True on insert_rows
schema=schema,
# If True, will only create the upload if an upload with this name doesn't already exist
# Otherwise, a counter will be added to the name to preserve name uniqueness
if_not_exists=False,
# If skip_bad_records is True, ignore records that are incompatible with the existing schema.
# This has no effect when update_schema is set to True on the insert_rows request.
skip_bad_records=False # Optional, default is False
)
insert_response = upload.insert_rows(
rows,
# If update_schema is set to True, variables can be added by subsequent streams,
# and variable types will be relaxed if new values are incompatible with the previous type.
# If False, an error will be thrown if a row would cause a schema update,
# unless skip_bad_records is set to True on the upload (in which case they'll be ignored)
update_schema=False,
)
# See REST API / uploads / insertRows
print(insert_response)

Release a new version

import redivis
dataset = redivis.organization("Demo").dataset("some dataset")
dataset.release()

Create a subsequent version on an existing dataset

import redivis
dataset = redivis.user("your-username").dataset("some dataset")
# dataset.create_next_version will throw an error if a "next" version already exists,
# unless the ignore_if_exists argument is provided
dataset = dataset.create_next_version(ignore_if_exists=True)
table = dataset.table("table name")
# By default, all new data is appended to the previous version of a table.
# If you'd like to replace the previous data, update the upload_merge_strategy.
table.update(upload_merge_strategy="replace")
upload = table.upload("data.csv")
with open("data.csv", "rb") as file:
upload.create(
file,
# All additional params are optional; default values are shown here
type="delimited", # One of stream, delimited, csv, ndjson, avro, parquet, orc, xls, xlsx, dta, sas7bdat, sav
skip_bad_records=False,
has_header_row=True, # Only relevant for csv, xls(x)
remove_on_fail=True, # Remove the upload if a failure occurs
wait_for_finish=True, # Wait for the upload to finish processing
raise_on_fail=True # Raise an error on failure
# The following are only relevant for delimited files:
allow_quoted_newlines=False, # Allow newlines within cells. Setting to True will substantially reduce ingest performance.
quote_character='"', # The character used to escape delimiters within cells. Generally a double quote in compliant CSVs.
delimiter=None, # For delimited files, explicitly set the delimiter, otherwise the delimiter will be automatically inferred.
)
# When all uploads have finished, release the next version
dataset.release()