Quantum-CLI: A powerful CLI to build, run, and test Quantum Machines.

Execute Your Workflow in a Loop with QuantumDataLytica

QuantumLoop is a powerful enhancement to our no-code data automation platform.

QuantumDataLytica vs Traditional ETL Tools: Accelerate Your Data Integration Without Coding

Traditional Extract, Transform, Load (ETL) tools have long been at the core of data integration practices.

QuantumDataLytica: The No-Code Alternative to Traditional ETL

For years, ETL (Extract, Transform, Load) solutions have been the cornerstone of data integration.

Collapseleft Icon

Documentation

What is QuantumDatalytica?

Unifying developers, engineers, and users to build, orchestrate, and automate intelligent workflows with Quantum Machines.

Overview

QuantumDatalytica empowers Developers to build modular Quantum Machines, enables Workflow Engineers to design powerful data workflows, and allows Workflow Users to execute and automate tasks seamlessly, all within a secure, scalable, and cloud-native platform.

Workflows are created using DAG-based design, with parent-child machine dependencies and conditional execution logic.

QuantumDatalytica as a unified platform for workflow automation, data processing, and task orchestration using modular, containerized units called Quantum Machines.

Marketplace, the central repository where machines are published, versioned, and reused across workflows and teams, reducing duplication of logic.

Provide a short onboarding path for each role:
  • Developers: Setup CLI, create machine, submit for review, publish
  • Engineers: Use Data factory and Workflow Designer Screen to design workflows
  • Users: Run workflows, track results, view logged usage of infrastructure

Next Topic: Machine Developer

Machine Developer

Design, build, and publish Quantum Machines to power scalable, reusable, and intelligent automation workflows.

Overview

A Machine Developer is a core contributor to the QuantumDatalytica platform, responsible for building, testing, and publishing self-contained units of logic called Quantum Machines. These machines are the foundational building blocks of automated data workflows. They are designed to perform a specific task, such as transforming data, calling APIs, running analytics, or generating documents  and are built with consistency, reusability, and scalability in mind.

Machine Developers primarily work in Python, structure their logic around a standardized interface (CoreEngine), and package everything into lightweight Quantum Blocks. Each Quantum Machine must adhere to a defined project.json, input/output schema and is validated through rigorous testing before publication. These machines are then made available to Workflow Engineers and Users, who can plug them into larger Quantum workflows or trigger them as standalone automation tasks.

The role is both creative and technical. Developers have the freedom to innovate, turning domain expertise or unique algorithms into shareable, monetizable machines, while working within a well-structured ecosystem. With Quantum-Machine CLI tools, testing environments, and version control built into the platform, developers can confidently deploy machines that scale across environments.

Whether you're solving a data challenge, building a PDF generator, or integrating external APIs, as a Machine Developer, your work directly empowers automated decision-making and workflow efficiency at scale.

Next Topic: Prerequisites

Prerequisites

Set up your local tools and environment to begin building, testing, and publishing Quantum Machines efficiently.

Before you begin building a Quantum Machine, it's essential to set up a local development environment that mirrors the runtime environment of the QuantumDatalytica platform. This ensures compatibility, smooth testing, and predictable deployment behavior.

Required Tools and Software

Tool Purpose Recommended Version
Python Core language for writing machine logic 3.9 or higher
Docker Core language for writing machine logic Latest stable
Git Source control and version management Latest
Quantum CLI Create, test, validate, and publish machines Latest from PyPI
VS Code (or similar IDE) Code editing and project navigation Latest
Quantum-Core-Engine This package provides the base class CoreEngine, which every machine's main.py must inherit from. It handles the standard execution flow, structured logging, and ensures compatibility with the QuantumDatalytica runtime engine. Latest

Once these tools are installed, you're fully equipped to create and run your first Quantum Machine.

Install Quantum-Machine CLI

Install the Quantum CLI to scaffold, test, and publish Quantum Machines directly from your local environment.

Overview

Quantum-Machine CLI is a command-line interface developed by QuantumDatalytica LLC to help developers build, run, test, log, and manage modular analytics components called Quantum Machines. These machines are the foundation of scalable, distributed data workflows within the QuantumDatalytica ecosystem. The CLI streamlines local development and ensures consistent behavior across environments.

Installation

pip install quantum-machine
Copy IconCopy

Usage

quantum --help
Copy IconCopy

Commands

Command Description
init machine Initialize a new Quantum Machine project with boilerplate files
run machine Run a machine and observe its behavior locally
build machine Build a Docker image for the specified machine
test machine Run unit tests defined for the machine
lint machine Check the machine's code for linting/style issues
validate machine Validate the machine's Project.json and required structure
init workflow Initialize a new workflow YAML file with DAG structure
add machine Add a machine as a task to a workflow and define its dependencies
run workflow Run a workflow DAG by executing machines in topological order
logs machine View logs from the last execution of a specified machine

Example Commands

Initialize a machine
quantum init machine HelloWorld
Copy IconCopy
Creates
  • HelloWorld/main.py
  • HelloWorld/Project.json
  • HelloWorld/requirements.txt
  • HelloWorld/Dockerfile
  • HelloWorld/input.json
  • HelloWorld/output.json

Update requirements.txt

git+https://github.com/QuantumDatalytica-LLC/quantum-core-engine.git@<latest_version>
Copy IconCopy

Open the generated requirements.txt and update it to include the required quantum-core-engine version.

Install dependencies (no cache)

pip install --no-cache-dir --force-reinstall -r HelloWorld/requirements.txt
Copy IconCopy

Install all required Python packages without using cache to ensure a clean environment.

Run the machine

quantum run machine HelloWorld
Copy IconCopy

Build the machine as Docker Image

quantum build machine HelloWorld
Copy IconCopy

Builds a Docker image with dependencies for the machine.

Test your machine

quantum test machine HelloWorld
Copy IconCopy

Runs the test suite defined under the machine's directory.

Lint your machine

quantum lint machine HelloWorld
Copy IconCopy

Applies flake8 or equivalent linting tools to maintain code standards.

Validate machine structure

quantum validate machine HelloWorld\<file_name>
Copy IconCopy

Ensures the machine has the correct Project.json, required fields, and structure.

Create a Workflow

quantum init workflow my_workflow
Copy IconCopy

Creates a workflow.yaml file to define machine dependencies.

Add DAG Machine to Workflow

quantum add machine HelloWord -w my_workflow
quantum add machine 2nd_Machine -w my_workflow
quantum add machine 3rd_Machine -p HelloWorld --workflow  my_workflow
quantum add machine 4th_Machine -parent 2nd_Machine -w my_workflow
quantum add machine 5th_Machine -p 3rd_Machine 4th_Machine -w my_workflow
Copy IconCopy

Run a Workflow

quantum run workflow my_workflow
Copy IconCopy

Executes machines in the correct DAG order as defined in workflow.yaml.

View machine logs

quantum logs machine HelloWord
Copy IconCopy

Displays the logs from the most recent execution of the HelloWorld machine.

Install quantum-core-engine

Install the core SDK that powers every Quantum Machine and ensures standardized execution, logging, and output handling.

Overview

The quantum-core-engine is a required Python package that provides the foundational structure for all Quantum Machines. It must be installed in your development environment before writing or testing any machine logic

pip install

https://github.com/QuantumDatalytica-LLC/quantum-core-engine.git@<latest_version>
Copy IconCopy

Run this command inside your virtual environment or project directory to keep dependencies clean and isolated. Replace <latest_version> with the latest version available on the Quantum-Core-Engine Releases page.

Objective

The Quantum-Core-Engine is more than just a library, it defines how every Quantum Machine behaves. Its goals include:

Objective Description
Standardized Execution Provides a CoreEngine base class that ensures all machines follow a common execution pattern.
Input Handling Accepts structured JSON inputs and injects them into your custom logic.
Output Validation Ensures outputs are clean, structured, and aligned with the declared schema.
Logging Support Built-in logging methods (self.machine_logger.info(), self.machine_logger.error(), self.machine_logger.warning())for easy debugging and traceability.
Security Hooks Enforces safe execution policies (e.g., blocking unsafe file paths or unvalidated data access).
Sandbox Integration Seamlessly connects to the Quantum CLI and sandbox environment for local testing and validation.

By using CoreEngine, developers don’t have to reinvent basic execution logic or worry about input/output parsing. They can focus purely on building business logic, while the framework handles the rest, making machines easier to validate, test, and integrate into Quantum Workflows.

How to use Quantum-Core-Engine

from quantum.CoreEngine import CoreEngine

class MyMachine(CoreEngine):
"""
Inherit from CoreEngine to define your machine's logic.
Implement the required methods such as 'receiving', ‘pre_processing’ 
and many more to handle input and produce output.
"""

if __name__ == "__main__":
# Initialize and execute machine
machine = MyMachine()
machine.start()
                                                    
Copy IconCopy

Breakdown of Key Elements

  • from quantum.CoreEngine import CoreEngine:
    Imports the base class required for all Quantum Machines.
  • class MyMachine(CoreEngine):
    Subclasses CoreEngine to inherit standardized structure, input/output flow, error handling, and lifecycle of execution flow.
  • if __name__ == "__main__":
    Ensures the script can be run directly during local testing or inside Quantum Cluster.
    machine.start() initializes the engine and runs it in Quantum Workflow.

Build Your First Machine

Learn how to create, structure, and run your first Quantum Machine using the Quantum CLI toolkit.

Before writing any code, it's important to understand what a Quantum Machine really is: a self-contained, containerized Python base code that takes structured input, performs a task, and returns structured output. Each machine follows a common design pattern, making it predictable, testable, and reusable.

Development starts by using the Quantum Machine CLI to scaffold a new machine with a predefined structure. You'll write your logic in main.py, prepare project.json which defines key parameters of the Quantum Machine, and input.json specifies the expected input parameters for the Quantum Machine, while output.json defines the structure and content of the machine's resulting output.

Use CLI commands to test your machine locally, simulating its execution within the actual Quantum Datalytica runtime environment.

Machine Scaffold

HelloWord/
  • main.py # Core logic of quantum-machine
  • Project.json# Machine metadata
  • input.json# Example input for testing
  • output.json# Expected output structure
  • Dockerfile# Cluster container definition
  • requirements.txt# Python dependencies
  • README.md# Documentation

Sample Machine (main.py)

from quantum.CoreEngine import CoreEngine

class MyMachine(CoreEngine):

    input_data = {}
    dependent_machine_data = {}

    def receiving(self, input_data, dependent_machine_data, callback):
        """Receiving
        :param input_data: Configure parameter values
        :param dependent_machine_data: Dependant/Previous machine data values
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Review Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

            self.input_data = input_data
            self.dependent_machine_data = dependent_machine_data

        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)

        finally:
            callback(data, error_list)

    def pre_processing(self, callback):
        """Pre-Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)

        finally:
            callback(data, error_list)

    def processing(self, callback):
        """Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)

        finally:
            callback(data, error_list)

    def post_processing(self, callback):
        """Post-Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)

        finally:
            callback(data, error_list)

    def packaging_shipping(self, callback):
        """Packaging & Shipping
        :return: callback method to pass data and error into next step, This is 
        final data to use into next machine
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)

        finally:
            callback(data, error_list)

if __name__ == '__main__':
    # Create a machine instance and start the process
    machine = MyMachine()
    machine.start()
Copy IconCopy

Breakdown of key component

from quantum.CoreEngine import CoreEngine
Copy IconCopy

This imports the CoreEngine base class from the Quantum-Core-Engine SDK. Inheriting from CoreEngine standardizes the structure, input/output flow, error handling, and lifecycle of your Quantum Machine.

input_data = {}Dependent_machine_data = {}
Copy IconCopy

input_data: Holds the raw input parameters passed into the machine. These typically come from input.json or are set by the workflow engine.

dependent_machine_data: Captures output from any parent/previous machines in the workflow. Useful when a machine depends on upstream results.

receiving(self, input_data, dependent_machine_data, callback)

This method acts as the initial handshake for the machine’s execution. It captures the raw input and any output from parent machines, setting the stage for downstream logic.

Developer Responsibilities:
  • Store and validate input_data and dependent_machine_data
  • Merge or reshape inputs as needed
  • Initialize flags or internal state for conditional flows
  • Log or audit incoming data
  • Plan routing logic based on inputs (e.g., skip logic, branching)
pre_processing(self, callback)

Pre-processing prepares and transforms the initial inputs into a shape more suitable for the main computation. This stage is often used for validation, enrichment, and staging.

Developer Responsibilities:
  • Validate input formats and required fields>
  • Apply data cleanup or normalization
  • Set up temporary variables or decision points
  • Fetch external data if needed (e.g., lookup tables)
  • Plan conditional logic paths for the main processing phase
processing(self, callback)

This is the core execution phase where the machine performs its primary logic or task.

Developer Responsibilities:
  • Implement the main logic (e.g., transformation, calculation, file parsing, API interaction)
  • Use input and pre-processed data to produce meaningful output
  • Manage iteration, aggregation, or ML inference steps
  • Track intermediate errors or skipped items
  • Store computed results in a structured format
post_processing(self, callback)

In this phase, the machine performs final adjustments or logging after processing is complete, preparing the output for final packaging.

Developer Responsibilities:
  • Format or finalize data structures
  • Remove temporary or redundant fields
  • Apply business rules or filters to results
  • Prepare summaries, logs, or indicators
  • Set result flags or post-execution status fields
packaging_shipping(self, callback)

This is the last phase in the lifecycle — it prepares and packages the machine’s output into a structure compatible with QuantumDatalytica’s output schema and downstream machines.

Developer Responsibilities:
  • Wrap data into the correct output format
  • Ensure output matches output.json expectations
  • Remove internal flags or non-exportable items
  • Document any errors or warnings
  • Optionally tag or version the output for traceability
if __name__ == '__main__':
machine = MyMachine()
machine.start()
Copy IconCopy
This allows the machine to run when executed directly (locally or in a Quantum Cluster).
  • start() method that initiates the lifecycle: receiving → pre-processing → processing → post-processing → packaging.

Next Topic: File Management

File Management

Efficiently handle file operations inside your Quantum Machines with built-in APIs for reading, writing, and streaming files.

Overview

Quantum Machines often need to handle files such as data files, PDFs, Excel sheets, logs, or any file format. The Quantum-Core-Engine (QCE) provides a unified and developer-friendly interface for reading and writing files, while automatically handling:

  • Small files inline (Base64-encoded, transparent to developer)
  • Large files via efficient streaming (raw bytes, no Base64 overhead)
  • Atomic writes (ensures data consistency)
  • Simple, consistent APIs

Core Concepts

1. Unified API
  • Use read_file(file_name, as_base64=False) to read any file.
  • Use write_file(file_name, content, overwrite=True) to write any file.

The QCE decides automatically whether to inline (Base64) or stream (raw bytes) based on file size.

2. Automatic Switching
  • Small files (< 10MB by default) → returned inline as Base64.
  • Large files (≥ 10MB) → streamed as raw byte chunks.
  • The threshold is configurable via _qce_max_inline_bytes.
3. Developer-Friendly Parameters
  • No need to manage Base64 manually.
  • Just set as_base64=True if you prefer raw bytes for small files too.
  • Streaming for big files is automatic.

Next Topic: Reading Files

Reading Files

Retrieve file contents seamlessly, either inline for small files or via efficient streaming for large datasets, with optional Base64 encoding.

Sample Machine (read_file())

The read_file function provides a unified interface for reading files in a Quantum Machine environment. It automatically handles small files inline (e.g., base64-encoded JSON) and large files with streaming (chunked binary reading). This ensures developers can read files of any size efficiently without worrying about memory overhead.

read_file(
    self,
    filename: Union[str, Path],
    *,
    as_base64: bool = True,
    inline_limit: Optional[int] = None,
    chunk_size: Optional[int] = None
) -> dict
Copy IconCopy
Parameters
Name Type Default Description
filename str Path (required)
as_base64 bool True Whether to encode the file content as base64 (inline) or return raw bytes (via iterator).
  • Use True for universal compatibility (JSON-safe).
  • Use False if raw binary streaming is preferred.
inline_limit int None None (defaults to engine setting, e.g., 10 MiB)
chunk_size int None None (defaults to e.g., 1 MiB)
Return Value

The function returns a dictionary (dict) with metadata and file content. The structure varies slightly depending on whether the file was inlined or streamed.

Inline Mode (small files)

{
    "status_code": 200,
    "message": "File read successfully (inline)",
    "file_name": "example.txt",
    "file_size": 1024,                  # size in bytes
    "inline": True,                     # indicates inline mode
    "as_base64": True,                  # whether content is base64
    "file_content": "SGVsbG8gd29ybGQh"  # base64 string or raw bytes
}

Copy IconCopy
  • status_code: 200 on success.
  • message: Human-friendly status description.
  • file_name: Name of the file read.
  • file_size: Size of the file in bytes.
  • inline: True if file content is fully inlined in the response.
  • as_base64: Whether the content was encoded in base64 (True) or kept as raw (False).
  • file_content: The actual file data (base64 string or raw bytes).

Streaming Mode (large files)

{
    "status_code": 206,
    "message": "File read successfully (streaming)",
    "file_name": "bigdata.csv",
    "file_size": 1073741824,      # 1GB
    "inline": False,              # indicates streaming mode
    "as_base64": False,           # indicates raw stream
    "file_iter": ,      # generator yielding file chunks
    "chunk_size": 1048576         # size of each chunk (e.g., 1 MiB)
}
Copy IconCopy
  • status_code: 206 ("Partial Content") indicates streaming.
  • message: Human-friendly description.
  • file_name: Name of the file read.
  • file_size: Size of the file in bytes.
  • inline: False, since content is not fully inlined.
  • as_base64: True or False depending on whether chunks are base64-encoded.
  • file_iter: A Python generator that yields file chunks sequentially.
  • chunk_size: The size of each yielded chunk.
Error Responses

File Not Found

{
  "status_code": 404,
  "message": "File not found or not allowed: missing.pdf",
  "filename": "missing.pdf"
} 
Copy IconCopy

Permission Denied

{
  "status_code": 403,
  "message": "Permission denied for file: secret.key",
  "filename": "secret.key"
}
Copy IconCopy

Unexpected Error

{
  "status_code": 500,
  "message": "Exception: [error details]",
  "filename": "data.bin"
}
Copy IconCopy
Examples

Read a small text file

resp = self.read_file("config.json")
print(resp["file_content"])  # base64-encoded content
Copy IconCopy

Read as bytes (for binary files like PDF/Excel)

resp = self.read_file("report.pdf", as_base64=True)
with open("local_report.pdf", "wb") as f:
    f.write(resp["file_content"])  # raw bytes
Copy IconCopy

Example: Read a large file (auto streaming)

resp = self.read_file("huge_dataset.csv")

with open("huge_dataset.csv", "wb") as f:
    for chunk in resp["iterator"]:   # streamed chunks (bytes)
        f.write(chunk)
Copy IconCopy

You don’t need to worry about Base64 or memory limits—the engine chooses the best mode.

Next Topic: Writing Files

Writing Files

Store files reliably using raw bytes, Base64, or streaming, with built-in support for atomic writes, overwrite control, and large file handling.

Writing Files (write_file())

The write_file() method is the central file write API for Quantum Machines. It supports multiple input formats (raw bytes, base64, iterators, or existing file paths) and automatically decides the safest and most efficient way to persist the data.

write_file(
    self,
    filename: Union[str, Path],
    *,
    data: bytes | bytearray | None = None,
    base64_data: Optional[str] = None,
    data_iter: Optional[Iterable[bytes | str]] = None,
    src_path: Optional[Union[str, Path]] = None,
    overwrite: bool = True,
    chunk_size: Optional[int] = None
) -> dict
Copy IconCopy
Parameters
Parameter Type Required Default Description
filename str | Path Yes -- Target filename to write into the Quantum Machine’s working directory. Can be relative or absolute.
data bytes | bytearray | None No None Direct binary data buffer to write. Use this for small in-memory data. Mutually exclusive with base64_data, data_iter, and src_path.
base64_data str | None No None Base64-encoded string of file contents. Useful when data is transferred over JSON or APIs. Mutually exclusive with data, data_iter, and src_path.
data_iter Iterable[bytes | str] | None No None Iterable/stream of chunks to write incrementally. Useful for large files(e.g., reading in 4MB pieces). Chunks can be raw bytes or base64 strings.
src_path str | Path | None No None Copy file from an existing local path instead of providing inline data. Handy for staging files or reusing existing outputs.
overwrite bool No True If True, overwrites existing file. If False and file exists, it raises an error.
chunk_size int | None No None Size (in bytes) of each chunk when writing from data_iter. Ignored for data and base64_data. Useful for throttling memory usage during streaming.
Return Value

The function always returns a dictionary containing metadata about the read operation, and depending on parameters, either inline content or a streaming reference.

Key Type Always Present? Description
status_code int Yes HTTP-style status code. 200 = success, 400/404/500 = various errors.
filename str Yes The file name that was read (relative to Quantum Machine’s working directory).
size int Yes Size of the file in bytes.
mode int Yes File permission bits (e.g., 420 for 0o644).
modified_at float Yes Last modified timestamp (epoch seconds).
data str (Base64) or bytes Conditional Inline file content. Returned if file size ≤ inline_limit (default behavior). Format depends on as_base64:
  • True → Base64-encoded string
  • False → Raw bytes
chunks list[dict] Conditional Present if file is streamed because it exceeded inline_limit Each chunk dict contains:
  • index → chunk number
  • data → chunk content (base64 or bytes)
streaming bool Yes True if file was returned in streamed chunks, False if inline.
Success Response
{
  "status_code": 200,
  "message": "File saved successfully",
  "file_path": "output/data.csv",
  "size_bytes": 2048,
  "sha256": "9d5f7ac...c5b1", 
  "mime_type": "text/csv"
}
Copy IconCopy
Error Responses

Conflict (file exists, overwrite disabled)

{
  "status_code": 409,
  "message": "File exists and overwrite=False",
  "filename": "output/data.csv"
}
Copy IconCopy

Invalid Input

{
  "status_code": 400,
  "message": "Provide exactly one of data, base64_data, data_iter, or src_path"
}
Copy IconCopy

File System Error

{
  "status_code": 500,
  "message": "Exception: [error details]"
}
Copy IconCopy
Behavior Summary
  • If file size ≤ inline_limit → returns data inline.
  • If file size > inline_limit → switches to chunked streaming, returns chunks list instead of data.
  • as_base64 toggles whether inline/chunk content is Base64 strings or raw bytes.
Examples

Write a small file (auto streaming)

self.write_file("hello.txt", data=b"Hello, Quantum!")
Copy IconCopy

Write from Base64 (rarely needed manually)

content_b64 = base64.b64encode(b"My secret file").decode()
self.write_file("secret.txt", base64_data=content_b64)
Copy IconCopy

Write a large file via streaming

def file_iterator(path, chunk_size=1024*1024):  # 1MB chunks
    with open(path, "rb") as f:
        while chunk := f.read(chunk_size):
            yield chunk

self.write_file("big_backup.zip", data_iter=file_iterator("local_backup.zip"))
Copy IconCopy

The engine detects iterators and automatically performs a streaming write.

Next Topic: Developer Example

Developer Example

Practical examples demonstrating how Quantum Machine Developers can use read_file and write_file in real workflows for seamless file handling.

Developer Example

The following examples demonstrate how to use CoreEngine’s file management APIs in a Quantum Machine. These cover both small inline files and large streamed files.

Write a small file (string/bytes)

Upload Machine

from quantum.CoreEngine import CoreEngine

class MyMachine(CoreEngine):

    input_data = {}
    dependent_machine_data = {}

    def receiving(self, input_data, dependent_machine_data, callback):
        """Receiving
        :param input_data: Configure parameter values
        :param dependent_machine_data: Dependant/Previous machine data values
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Review Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

    def pre_processing(self, callback):
        """Pre-Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()


        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

    def processing(self, callback):
        """Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

            def file_iterator(path, chunk_size=1024*1024):  # 1MB chunks
                with open(path, "rb") as f:
                    while chunk := f.read(chunk_size):
                        yield chunk

            res = self.write_file("UploadSampleFile.zip",
             data_iter=file_iterator("D:\SampleFile.zip"))
            data = res

        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

    def post_processing(self, callback):
        """Post-Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()


        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

    def packaging_shipping(self, callback):
        """Packaging & Shipping
        :return: callback method to pass data and error into next step,
         This is final data to use into next machine
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()


        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

if __name__ == '__main__':
    # Create a machine instance and start the process
    machine = MyMachine()
    machine.start()
Copy IconCopy

Download Machine

from quantum.CoreEngine import CoreEngine

class MyMachine(CoreEngine):

    input_data = {}
    dependent_machine_data = {}

    # file_path get from previous machine or configure parameter
    file_path = None

    def receiving(self, input_data, dependent_machine_data, callback):
        """Receiving
        :param input_data: Configure parameter values
        :param dependent_machine_data: Dependant/Previous machine data values
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Review Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

            self.input_data = input_data
            self.dependent_machine_data = dependent_machine_data

            self.file_path = self.input_data.get("file_path", None)
            if not self.file_path:
                err_msg = "File path is not provided."
                error_list.append(err_msg)

        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

    def pre_processing(self, callback):
        """Pre-Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()


        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

    def processing(self, callback):
        """Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()

            # Read chunk data
            resp = self.read_file(self.file_path)
            if resp.get("streamed"):
                with open("DownloadSampleFile.zip", "wb") as out:
                    for chunk in resp["iterator"]:
                        out.write(chunk)


        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

    def post_processing(self, callback):
        """Post-Processing
        :return: callback method to pass data and error into next step
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()


        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

    def packaging_shipping(self, callback):
        """Packaging & Shipping
        :return: callback method to pass data and error into next step,
         This is final data to use into next machine
        """
        data = {}
        error_list = []
        try:
            # Updated Final data and Error list
            data = self.get_final_data()
            error_list = self.get_error_list()


        except Exception as e:
            err_msg = f"Error : {str(e)}"
            error_list.append(err_msg)
        finally:
            callback(data, error_list)

if __name__ == '__main__':
    # Create a machine instance and start the process
    machine = MyMachine()
    machine.start()
Copy IconCopy

With these utilities, Quantum Machine Developers can handle any file size, any format, safely and efficiently without needing to think about encoding or streaming logistics.

Quantum Logo

Join the Developer Community

Connect with peers, join our Slack/Discord community, attend monthly AMAs, and get featured through the Machine Spotlight Program.

Work seamlessly with a creative team that's built to match your pace and exceed your expectations. Questions? Need Help?