PERIAN Flyte Agent Setup Guide

PERIAN enables serverless execution of arbitrary workloads with user-specified hardware and location requirements on multiple cloud providers. Using the PERIAN Flyte agent, you can run Flyte tasks on multiple clouds through PERIAN by simply specifying the requirements, including accelerator types, in the task configuration.


This is a step-by-step guide on how to use the PERIAN Flyte Agent. For any issues or questions, contact us.

Create a PERIAN Account

If you haven't already, sign-up for an account with PERIAN. You will receive an invitation email with your credentials that you will need in the upcoming steps.

Local Testing

If you want to test the PERIAN Agent locally without deploying it on a production cluster, follow these steps:

  1. Step 1: Install the agent
pip install flytekitplugins-perian-job

Or the unofficial version (includes the latest features that has not been approved by Flyte maintainers yet).

pip install flytekitplugins-perian-job-unofficial

Step 2: Setup the required secrets locally in your /etc/secrets directory


Create a file under /etc/secrets with the name being the secret name (e.g. perian_token) and the content of the file is the raw secret value.

PERIAN credentials:
  • perian_organization
  • perian_token

For accessing the Flyte storage bucket, you need to add either AWS or GCP credentials. These credentials are never logged by PERIAN and are only stored until then are used, then immediately deleted.

  • AWS credentials:
    • aws_access_key_id
    • aws_secret_access_key
  • OR GCP credentials:
    • google_application_credentials: This should be the full json credentials.

Copy the following test code into a new wf.py file and replace <registry> with a container register you can push to:

from flytekit import ImageSpec, task, workflow
from flytekitplugins.perian_job import PerianConfig

image_spec = ImageSpec(
    name="flyte-perian-demo",
    registry="<registry>",
    python_version="3.11",
    apt_packages=["wget", "curl", "git"],
    packages=[
        "flytekitplugins-perian-job",
    ],
)

@task(
    container_image=image_spec,
    task_config=PerianConfig(
        cores=2,
    )
)
def say_hello(name: str) -> str:
    print("Sending greetings...")
    return f"Hello, {name}!"

@workflow
def wf(name: str = 'world') -> str:
    return say_hello(name=name)
  • Since Flyte skips building the task image when running tasks locally, we need to build it manually:
pyflyte build wf.py wf
  • After the image is built and pushed, replace the container_image parameter in the @task decorator with the full image name: container_image="<registry>/flyte-perian-demo:<tag>"
  • Run the workflow, replacing <bucket> with the name of your Flyte S3 data bucket:


pyflyte run --raw-output-data-prefix=s3://<bucket> wf.py wf

Production Flyte Cluster

Step 1: Enable Flyte Agent Service

If you have never used a Flyte Agent on your cluster before, you need to enable it first.

1. In the Flyte Helm values, add the agent-service to the list of enabled plugins:

tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - agent-service

2. Append the following block:

flyteagent:
  # enable Flag to enable bundled Flyte Agent
  enabled: true

Step 2: Setup PERIAN agent

1. In the Flyte Helm values, associate tasks of type perian_task with the agent service:

tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - agent-service
    default-for-task-types:
      - container: container
      - container_array: K8S-ARRAY
      - perian_task: agent-service

2. Change the Flyte Agent service image to an image that includes the PERIAN agent:

Start editing the flyteagent deployment on your cluster:

kubectl edit deployment flyteagent -n flyte

Replace the image parameter (which should be something like cr.flyte.org/flyteorg/flyteagent...) with:

image: ghcr.io/perian-io/flyte-agent:latest

Or the unofficial release that includes the latest agent features:

image: ghcr.io/perian-io/flyte-agent-unofficial:latest

The images are built based on this repo.

After 1-2 minutes, verify that the flyte agent service is running with the correct PERIAN agent. The following command should list all the running agents. Replace <flyteagent-pod> with the full name of the flyteagent pod:

kubectl logs <flyteagent-pod> -n flyte

Step 3: Set the credentials as secrets

Required secrets:
  • PERIAN credentials:
    • perian_organization
    • perian_token
  • AWS credentials:
    • aws_access_key_id
    • aws_secret_access_key
  • OR GCP credentials:
    • google_application_credentials: This should be the full json credentials.

Optional secrets:
  • Private docker registry credentials:
    • docker_registry_url
    • docker_registry_username
    • docker_registry_password

For each of the secrets described in the secrets section above, repeat the following steps:

1. Convert secret to base64, replacing <secret-value>:

SECRET_VALUE=$(echo -n "<secret-value>" | base64 -w 0)

2. Add secret to the flyteagent, replacing <secret-name>:

kubectl patch secret flyteagent -n flyte --patch "{\"data\":{\"<secret-name>\":\"$SECRET_VALUE\"}}"

3. (Optional) Verify the secret value written, replacing <secret-name>:

kubectl get secret flyteagent -n flyte -o jsonpath='{.data.<secret-name>}' | base64 --decode

Or verify all secrets written (values are base64 encoded):

kubectl edit secret flyteagent -n flyte

Step 4: Run a test workflow

  • Run the following example workflow, replacing <registry> with a container register you can push to:
from flytekit import ImageSpec, task, workflow
from flytekitplugins.perian_job import PerianConfig

image_spec = ImageSpec(
    name="flyte-perian-demo",
    registry="<registry>",
    python_version="3.11",
    apt_packages=["wget", "curl", "git"],
    packages=[
        "flytekitplugins-perian-job",
    ],
)

@task(
    container_image=image_spec,
    task_config=PerianConfig(
        cores=2,
    )
)
def say_hello(name: str) -> str:
    print("Sending greetings...")
    return f"Hello, {name}!"

@workflow
def wf(name: str = 'world') -> str:
    return say_hello(name=name)