Flyte - Sky Platform Integration
We offer a Flyte Agent for executing Flyte tasks on PERIAN Sky Platform. To install the plugin, run the following command:
pip install flytekitplugins-perian-job
Note: If you want to use the latest features that has not been approved by Flyte maintainers yet, use the package flytekitplugins-perian-job-unofficial instead.
Getting Started
This plugin allows executing PythonFunctionTask or raw ContainerTask on PERIAN Sky Platform. For PythonFunctionTask, an ImageSpec needs to be built with the PERIAN agent plugin installed.
See the examples below for how to use it.
Flyte Integration Overview
Parameters
The following parameters can be used to set the requirements for the PERIAN task. If any of the requirements are skipped, it is replaced with the cheapest option. At least one requirement value should be set.
- cores: Number of CPU cores
- memory: Amount of memory in GB
- accelerators: Number of accelerators
- accelerator_type: Type of accelerator (e.g. 'A100'). For a full list of supported accelerators, use the perian CLI list-accelerators command
- country_code: Country code to run the job in (e.g. 'DE')
Credentials
The following secrets are required to be defined for the agent server:
-
PERIAN credentials:
perian_organization
perian_token
-
To access the Flyte storage bucket, you need to add either AWS or GCP credentials. These credentials are never logged by PERIAN and are only stored until then are used, then immediately deleted.
-
AWS credentials:
aws_access_key_id
aws_secret_access_key
-
GCP credentials:
google_application_credentials
: This should be the full json credentials.
-
AWS credentials:
-
(Optional) Custom docker registry for pulling the Flyte image:
docker_registry_url
docker_registry_username
docker_registry_password
Example
Example using a PythonFunctionTask:
from flytekit import ImageSpec, task, workflow
from flytekitplugins.perian_job import PerianConfig
image_spec = ImageSpec(
name="flyte-test",
registry="my-registry",
python_version="3.11",
apt_packages=["wget", "curl", "git"],
packages=[
"flytekitplugins-perian-job",
],
)
@task(container_image=image_spec,
task_config=PerianConfig(
accelerators=1,
accelerator_type="A100",
))
def perian_hello(name: str) -> str:
return f"hello {name}!"
@workflow
def my_wf(name: str = "world") -> str:
return perian_hello(name=name)
Example using ContainerTask:
from flytekit import ImageSpec, task, workflow
from flytekitplugins.perian_job import PerianContainerTask
perian_hello = PerianContainerTask(
name="perian_hello",
task_config=PerianConfig(
accelerators=1,
accelerator_type="A100"),
image="alpine",
environment={"MYKEY": "MYVALUE"},
inputs=kwtypes(name=str),
command=[
"echo",
"hello {{.inputs.name}}!",
]
)
@workflow
def my_wf(name: str = "world") -> str:
return perian_hello(name=name)