Data Pipelines

Pipelines are how you read and write data being processed by CHEESE.

class cheese.pipeline.Pipeline[source]

Bases: object

Abstract base class for a data pipeline. Processes data by fetching from source of data and posting to destination of data

dequeue_task(msg)
abstract exhausted()bool[source]

Is there any more data to read?

abstract fetch()cheese.data.BatchElement[source]

Fetches next BatchElement from data source under assumption it is not exhausted.

abstract get_stats()Dict[source]

Returns statistics about pipeline. Likely different for any given Pipeline

init_connection(connection: b_rabbit.b_rabbit.BRabbit)[source]

Initialize RabbitMQ connection

abstract post(batch_element: cheese.data.BatchElement)[source]

Post completed batch element to data destination.

queue_task()bool[source]

Creates a task and queue to client.

Returns

True if succesful, False if pipeline exhausted.

Return type

bool

class cheese.pipeline.datasets.DatasetPipeline(format: str = 'csv', save_every: int = 1)[source]

Bases: cheese.pipeline.Pipeline

Base class for any pipeline thats data destination is a datasets.Dataset object

Parameters
  • format (str) – Format to save result dataset to. Defaults to arrow. Can be arrow or csv.

  • save_every (int) – Save dataset whenever this number of rows is added.

add_row_to_dataset(row: Dict[str, Any])[source]

Add single row to result dataset and then saves.

Parameters

row (Dict[str, Any]) – The row, as a dictionary, to add to the result dataset

load_dataset()bool[source]

Loads the results dataset from a given path. Returns false if load fails. Assumes write_path has been set already.

Returns

Whether load was successful

Return type

bool

save_dataset()[source]

Saves the result dataset to the write path (assuming it has been specified by subclass). Does nothing if there is no data to save yet.

class cheese.pipeline.iterable_dataset.IterablePipeline(iter: Iterable, write_path: str, force_new: bool = False, max_length=inf, **kwargs)[source]

Bases: cheese.pipeline.datasets.DatasetPipeline

Base class for any pipeline reading from an iterable dataset Writes results to Datasets dataset

Parameters
  • iter (Iterable) – The iterable to be used to read data from

  • write_path (str) – Path to write result dataset to

  • force_new (bool) – Whether to force a new dataset (as opposed to recovering saved progress from write_path)

  • max_length – Maximum number of entries to produce for output dataset. Defaults to infinity.

exhausted()bool[source]

Is there any more data to read?

abstract fetch()cheese.data.BatchElement[source]

Fetch a batch element from data source. Should call fetch_next() in most cases.

fetch_next()Any[source]

Attempts to get next item from webdataset. Returns None if there is no such item.

get_stats()Dict[source]

Returns statistics about pipeline. Likely different for any given Pipeline

abstract post(batch_element: cheese.data.BatchElement)[source]

Post completed batch element to data destination. Should call post_row() before returning in most cases.

post_row(row: Dict[str, Any])[source]

Given a row to add to result dataset: updates progress, adds row and saves.

preprocess(x: Any)Any[source]

When data source is iterated over, this function is applied to all outputted data. Should also validate data and raise InvalidDataException if data invalid.

save_dataset()[source]

Save dataset and progress.

class cheese.pipeline.write_only.WriteOnlyPipeline(write_path: str, force_new: bool = False, **kwargs)[source]

Bases: cheese.pipeline.datasets.DatasetPipeline

Base pipeline for any task that involves giving users empty data but writing concrete results (i.e. prompting model generation, then receiving feedback)

Parameters
  • write_path (str) – The path to write the result dataset to

  • force_new (bool) – Whether to force a new dataset to be created, even if one already exists at the write path

abstract fetch()cheese.data.BatchElement[source]

Generate empty BatchElement to send to client

abstract post(batch_element: cheese.data.BatchElement)[source]

Post completed batch element to data destination.

class cheese.pipeline.wav_folder.WavFolderPipeline(read_path: str, write_path: str, force_new: bool = False)[source]

Bases: cheese.pipeline.datasets.DatasetPipeline

Base pipeline for audio datasets in form of directory of .wav files. Writes to a standard datasets format dataset.

Parameters
  • read_path (str) – Path to directory of wav files to read from

  • write_path (str) – Path to directory to write resulting dataset to

  • force_new (bool) – Whether to force a new dataset (as opposed to recovering saved progress from write_path)

exhausted()bool[source]

Is there any more data to read?

abstract fetch()cheese.data.BatchElement[source]

Fetch a batch element from data source. Should call id_pop to get path in most cases.

id_complete(id: int, row: Dict[str, Any])[source]

Given a row to add to dataset, marks corresponding entry in index_book complete

id_pop()Dict[str, Any][source]

Pop an id and path from the index_book queue. Returns a dict that can be given directly to a batch element constructor as keyword arguments.

abstract post(batch_element: cheese.data.BatchElement)[source]

Post completed batch element to data destination. Should call id_complete() before returning in most cases.

save_dataset()[source]

Saves result dataset, as well as (in specific case of WavFolderPipeline) an index book of which audio files have been looked at so far