PERIAN Flyte Agent Setup Guide
PERIAN enables serverless execution of arbitrary workloads with user-specified hardware and location requirements on multiple cloud providers. Using the PERIAN Flyte agent, you can run Flyte tasks on multiple clouds through PERIAN by simply specifying the requirements, including accelerator types, in the task configuration.
This is a step-by-step guide on how to use the PERIAN Flyte Agent. For any issues or questions, contact us.
Create a PERIAN Account
If you haven't already, sign-up for an account with PERIAN. You will receive an invitation email with your credentials that you will need in the upcoming steps.
Local Testing
If you want to test the PERIAN Agent locally without deploying it on a production cluster, follow these steps:
- 1. Install the agent:
pip install flytekitplugins-perian-job
PERIAN credentials:
- perian_organization
- perian_token
For accessing the Flyte storage bucket, you need to add either AWS or GCP credentials. These credentials are never logged by PERIAN and are only stored until then are used, then immediately deleted.
-
AWS credentials:
- aws_access_key_id
- aws_secret_access_key
-
GCP credentials:
- google_application_credentials: This should be the full json credentials.
Copy the following test code into a new wf.py file and replace <registry> with a container register you can push to:
from flytekit import ImageSpec, task, workflow
from flytekitplugins.perian_job import PerianConfig
image_spec = ImageSpec(
name="flyte-perian-demo",
registry="<registry>",
python_version="3.11",
apt_packages=["wget", "curl", "git"],
packages=[
"flytekitplugins-perian-job",
],
)
@task(
container_image=image_spec,
task_config=PerianConfig(
cores=2,
)
)
def say_hello(name: str) -> str:
print("Sending greetings...")
return f"Hello, {name}!"
@workflow
def wf(name: str = 'world') -> str:
return say_hello(name=name)
- Since Flyte skips building the task image when running tasks locally, we need to build it manually:
pyflyte build wf.py wf
- After the image is built and pushed, replace the container_image parameter in the @task decorator with the full image name: container_image="<registry>/flyte-perian-demo:<tag>"
- Run the workflow, replacing <bucket> with the name of your Flyte S3 data bucket:
pyflyte run --raw-output-data-prefix=s3://<bucket> wf.py wf
Production Flyte Cluster
To run PERIAN tasks on a production Flyte cluster, you need to setup the PERIAN agent.
- Since the PERIAN agent is not part of the official Flyte agent image, you need to build a new agent image. Here is an example agent image Dockerfile. You should add any other agents you use, then build and push the image to a remote or cluster registry.
FROM python:3.10-slim-bookworm
RUN apt-get update && apt-get install -y \
build-essential \
git \
&& apt-get clean autoclean \
&& apt-get autoremove --yes \
&& rm -rf /var/lib/{apt,dpkg,cache,log}/
RUN pip install --no-cache-dir \
prometheus-client \
grpcio-health-checking \
flytekit \
flytekitplugins-perian-job \
pydantic==2.5.2
CMD ["pyflyte", "serve", "agent", "--port", "8000"]
- Start editing the flyteagent deployment on your k8s cluster:
kubectl edit deployment flyteagent -n flyte
- Replace the image parameter (which should be something like cr.flyte.org/flyteorg/flyteagent...) with the new agent image, then wait a few minutes.
- Repeat the following steps for each of the secrets (see secrets section above): Convert secret to base64:
echo -n "<secret>" | base64
- Add secret to the flyteagent:
apiVersion: v1
kind: Secret
metadata:
name: flyteagent
namespace: flyte
data:
<secret1>: <BASE64_ENCODED_SECRET>
# ... other data fields as needed
- Run the following example workflow, replacing <registry> with a container register you can push to:
from flytekit import ImageSpec, task, workflow
from flytekitplugins.perian_job import PerianConfig
image_spec = ImageSpec(
name="flyte-perian-demo",
registry="<registry>",
python_version="3.11",
apt_packages=["wget", "curl", "git"],
packages=[
"flytekitplugins-perian-job",
],
)
@task(
container_image=image_spec,
task_config=PerianConfig(
cores=2,
)
)
def say_hello(name: str) -> str:
print("Sending greetings...")
return f"Hello, {name}!"
@workflow
def wf(name: str = 'world') -> str:
return say_hello(name=name)