Prefect

Author

Benedict Thekkel

Start prefect server

prefect server start

http://127.0.0.1:4200

Change Settings

prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"

View settings

prefect config view --show-defaults

Flows

Flows can be thought of as special types of functions. They can take inputs, perform work, and return an output. In fact, you can turn any function into a Prefect flow by adding the @flow decorator. When a function becomes a flow, its behavior changes, giving it the following advantages:

Task called within Flows

from prefect import flow, task

@task
def print_hello(name):
    print(f"Hello {name}!")

@flow(name="Hello Flow")
def hello_world(name="world"):
    print_hello(name)
import datetime
from prefect import flow

@flow(flow_run_name="{name}-on-{date:%A}")
def my_flow(name: str, date: datetime.datetime):
    pass

# creates a flow run called 'marvin-on-Thursday'
my_flow(name="marvin", date=datetime.datetime.now(datetime.timezone.utc))
18:17:46.247 | INFO    | prefect.engine - Created flow run 'placid-finch' for flow 'my-flow'
18:17:46.320 | INFO    | Flow run 'placid-finch' - Finished in state Completed()

Flow settings

Flows allow a great deal of configuration by passing arguments to the decorator. Flows accept the following optional settings.

Argument Description
description An optional string description for the flow. If not provided, the description will be pulled from the docstring for the decorated function.
name An optional name for the flow. If not provided, the name will be inferred from the function.
retries An optional number of times to retry on flow run failure.
retry_delay_seconds An optional number of seconds to wait before retrying the flow after failure. This is only applicable if retries is nonzero.
flow_run_name An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow’s parameters as variables; this name can also be provided as a function that returns a string.
task_runner An optional task runner to use for task execution within the flow when you .submit() tasks. If not provided and you .submit() tasks, the ConcurrentTaskRunner will be used.
timeout_seconds An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it will be marked as failed. Flow execution may continue until the next task is called.
validate_parameters Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is True.
version An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null.

Flow Example

from prefect import flow, task
import datetime
from prefect.runtime import flow_run
from prefect.task_runners import SequentialTaskRunner

def generate_flow_run_name():
    flow_name = flow_run.flow_name
    parameters = flow_run.parameters
    name = parameters["name"]

    
    date = datetime.datetime.now(datetime.timezone.utc)

    return f"flow run name test: {flow_name}-with-{name}: {date:%A}"

@task(name="task test")
def print_hello(name:str) -> str:
    msg = f"Hello {name}!"
    print(msg)
    return msg

@flow(name="flow test",
      description="flow test description",
      task_runner=SequentialTaskRunner(),
      flow_run_name=generate_flow_run_name
      )
def hello_world(name:str ="world") -> None:
    message = print_hello(name)

hello_world("Ben")
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'task test' and defined at '/tmp/ipykernel_23433/1543666974.py:16' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'flow test' and defined at '/tmp/ipykernel_23433/1543666974.py:22' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
18:31:47.848 | INFO    | prefect.engine - Created flow run 'caped-swan' for flow 'flow test'
18:31:47.927 | INFO    | Flow run 'flow run name test: flow test-with-Ben: Wednesday' - Created task run 'task test-0' for task 'task test'
18:31:47.929 | INFO    | Flow run 'flow run name test: flow test-with-Ben: Wednesday' - Executing 'task test-0' immediately...
Hello Ben!
18:31:47.998 | INFO    | Task run 'task test-0' - Finished in state Completed()
18:31:48.025 | INFO    | Flow run 'caped-swan' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`'))]
import graphviz
from prefect import flow, task

@task(name="Print Hello")
def print_hello(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg

@task(name="Print Hello Again")
def print_hello_again(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg

@flow(name="Hello Flow")
def hello_world(name="world"):
    message = print_hello(name)
    message2 = print_hello_again(message)

hello_world.visualize()
<coroutine object Flow.visualize>

Subflows

A subflow run is created when a flow function is called inside the execution of another flow. The primary flow is the “parent” flow. The flow created within the parent is the “child” flow or “subflow.”

Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately. When a subflow starts, it will create a new task runner for tasks within the subflow. When the subflow completes, the task runner is shut down.

from prefect import flow, task

@task(name="Print Hello")
def print_hello(name):
    msg = f"Hello {name}!"
    print(msg)
    return msg

@flow(name="Subflow")
def my_subflow(msg):
    print(f"Subflow says: {msg}")

@flow(name="Hello Flow")
def hello_world(name="world"):
    for _ in range(3):
        message = print_hello(name)
        my_subflow(message)

hello_world("Marvin")
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'Print Hello' and defined at '/tmp/ipykernel_23433/3107251931.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'Subflow' and defined at '/tmp/ipykernel_23433/3107251931.py:9' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'Hello Flow' and defined at '/tmp/ipykernel_23433/3107251931.py:13' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
18:41:19.292 | INFO    | prefect.engine - Created flow run 'wonderful-wolf' for flow 'Hello Flow'
18:41:19.356 | INFO    | Flow run 'wonderful-wolf' - Created task run 'Print Hello-0' for task 'Print Hello'
18:41:19.358 | INFO    | Flow run 'wonderful-wolf' - Executing 'Print Hello-0' immediately...
Hello Marvin!
18:41:19.430 | INFO    | Task run 'Print Hello-0' - Finished in state Completed()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'Subflow' and defined at '/tmp/ipykernel_23433/3107251931.py:9' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
18:41:19.504 | INFO    | Flow run 'wonderful-wolf' - Created subflow run 'aspiring-aardwark' for flow 'Subflow'
Subflow says: Hello Marvin!
18:41:19.585 | INFO    | Flow run 'aspiring-aardwark' - Finished in state Completed()
18:41:19.607 | INFO    | Flow run 'wonderful-wolf' - Created task run 'Print Hello-1' for task 'Print Hello'
18:41:19.608 | INFO    | Flow run 'wonderful-wolf' - Executing 'Print Hello-1' immediately...
Hello Marvin!
18:41:19.672 | INFO    | Task run 'Print Hello-1' - Finished in state Completed()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'Subflow' and defined at '/tmp/ipykernel_23433/3107251931.py:9' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
18:41:19.738 | INFO    | Flow run 'wonderful-wolf' - Created subflow run 'garrulous-starling' for flow 'Subflow'
Subflow says: Hello Marvin!
18:41:19.815 | INFO    | Flow run 'garrulous-starling' - Finished in state Completed()
18:41:19.839 | INFO    | Flow run 'wonderful-wolf' - Created task run 'Print Hello-2' for task 'Print Hello'
18:41:19.840 | INFO    | Flow run 'wonderful-wolf' - Executing 'Print Hello-2' immediately...
Hello Marvin!
18:41:19.904 | INFO    | Task run 'Print Hello-2' - Finished in state Completed()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'Subflow' and defined at '/tmp/ipykernel_23433/3107251931.py:9' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
18:41:19.976 | INFO    | Flow run 'wonderful-wolf' - Created subflow run 'snobbish-rottweiler' for flow 'Subflow'
Subflow says: Hello Marvin!
18:41:20.056 | INFO    | Flow run 'snobbish-rottweiler' - Finished in state Completed()
18:41:20.084 | INFO    | Flow run 'wonderful-wolf' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

!!! tip “Subflows or tasks?” In Prefect you can call tasks or subflows to do work within your workflow, including passing results from other tasks to your subflow. So a common question is:

"When should I use a subflow instead of a task?"

We recommend writing tasks that do a discrete, specific piece of work in your workflow: calling an API, performing a database operation, analyzing or transforming a data point. 
Prefect tasks are well suited to parallel or distributed execution using distributed computation frameworks such as Dask or Ray. 
For troubleshooting, the more granular you create your tasks, the easier it is to find and fix issues should a task fail.

Subflows enable you to group related tasks within your workflow. 
Here are some scenarios where you might choose to use a subflow rather than calling tasks individually:

- Observability: Subflows, like any other flow run, have first-class observability within the Prefect UI and Prefect Cloud. You'll see subflow status in the **Flow Runs** dashboard rather than having to dig down into the tasks within a specific flow run. See [Final state determination](#final-state-determination) for some examples of leveraging task state within flows.
- Conditional flows: If you have a group of tasks that run only under certain conditions, you can group them within a subflow and conditionally run the subflow rather than each task individually.
- Parameters: Flows have first-class support for parameterization, making it easy to run the same group of tasks in different use cases by simply passing different parameters to the subflow in which they run.
- Task runners: Subflows enable you to specify the task runner used for tasks within the flow. For example, if you want to optimize parallel execution of certain tasks with Dask, you can group them in a subflow that uses the Dask task runner. You can use a different task runner for each subflow.
from prefect import flow
from datetime import datetime

@flow
def what_day_is_it(date: datetime = None):
    if date is None:
        date = datetime.now(timezone.utc)
    print(f"It was {date.strftime('%A')} on {date.isoformat()}")

what_day_is_it("2021-01-01T02:00:19.180906")
# It was Friday on 2021-01-01T02:00:19.180906
18:49:42.898 | INFO    | prefect.engine - Created flow run 'vermilion-spider' for flow 'what-day-is-it'
It was Friday on 2021-01-01T02:00:19.180906
18:49:42.963 | INFO    | Flow run 'vermilion-spider' - Finished in state Completed()

Tasks

Task arguments

Tasks allow for customization through optional arguments:

Argument Description
name An optional name for the task. If not provided, the name will be inferred from the function name.
description An optional string description for the task. If not provided, the description will be pulled from the docstring for the decorated function.
tags An optional set of tags to be associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime.
cache_key_fn An optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result will be restored instead of running the task again.
cache_expiration An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire.
retries An optional number of times to retry on task run failure.
retry_delay_seconds An optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero.
log_prints An optional boolean indicating whether to log print statements.
persist_result An optional boolean indicating whether to persist the result of the task run to storage.
from prefect import flow, task

@task
def my_first_task(msg):
    print(f"Hello, {msg}")

@task
def my_second_task(msg):
    my_first_task.fn(msg)
    return msg

@flow(flow_run_name = "task test1")
def my_flow(msg: str = "Trillian"):
    my_second_task(msg)
    return msg

my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_first_task' and defined at '/tmp/ipykernel_23433/819281590.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_second_task' and defined at '/tmp/ipykernel_23433/819281590.py:7' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_23433/819281590.py:12' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
19:04:31.723 | INFO    | prefect.engine - Created flow run 'fluorescent-bittern' for flow 'my-flow'
19:04:31.802 | INFO    | Flow run 'task test1' - Created task run 'my_second_task-0' for task 'my_second_task'
19:04:31.804 | INFO    | Flow run 'task test1' - Executing 'my_second_task-0' immediately...
Hello, Trillian
19:04:31.873 | INFO    | Task run 'my_second_task-0' - Finished in state Completed()
19:04:31.904 | INFO    | Flow run 'fluorescent-bittern' - Finished in state Completed()
'Trillian'
from prefect import flow
from prefect.runtime import flow_run, task_run

def generate_task_name():
    flow_name = flow_run.flow_name
    task_name = task_run.task_name

    parameters = task_run.parameters
    name = parameters["name"]
    limit = parameters["limit"]

    return f"{flow_name}-{task_name}-with-{name}-and-{limit}"

@task(name="my-example-task",
      description="An example task for a tutorial.",
      task_run_name=generate_task_name)
def my_task(name: str, limit: int = 100):
    pass

@flow
def my_flow(name: str):
    # creates a run with a name like "my-flow-my-example-task-with-marvin-and-100"
    my_task(name="marvin")
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_23433/732348698.py:20' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(

Tags

@task(name="hello-task", tags=["test"])
def my_task():
    print("Hello, I'm a task")

Retries

import httpx

from prefect import flow, task


@task(retries=2, retry_delay_seconds=5)
def get_data_task(
    url: str = "https://api.brittle-service.com/endpoint"
) -> dict:
    response = httpx.get(url)

    # If the response status code is anything but a 2xx, httpx will raise
    # an exception. This task doesn't handle the exception, so Prefect will
    # catch the exception and will consider the task run failed.
    response.raise_for_status()

    return response.json()


@flow
def get_data_flow():
    get_data_task()

get_data_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'get_data_task' and defined at '/tmp/ipykernel_23433/1331835922.py:6' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'get-data-flow' and defined at '/tmp/ipykernel_23433/1331835922.py:20' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
19:08:29.596 | INFO    | prefect.engine - Created flow run 'almond-spider' for flow 'get-data-flow'
19:08:29.655 | INFO    | Flow run 'almond-spider' - Created task run 'get_data_task-0' for task 'get_data_task'
19:08:29.658 | INFO    | Flow run 'almond-spider' - Executing 'get_data_task-0' immediately...
19:08:29.859 | ERROR   | Task run 'get_data_task-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
    yield
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 233, in handle_request
    resp = self._pool.handle_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 216, in handle_request
    raise exc from None
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 196, in handle_request
    response = connection.handle_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 99, in handle_request
    raise exc
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 76, in handle_request
    stream = self._connect(request)
             ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 122, in _connect
    stream = self._network_backend.connect_tcp(**kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 205, in connect_tcp
    with map_exceptions(exc_map):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.ConnectError: [Errno -2] Name or service not known

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/1331835922.py", line 10, in get_data_task
    response = httpx.get(url)
               ^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 198, in get
    return request(
           ^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 106, in request
    return client.request(
           ^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 827, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 914, in send
    response = self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth
    response = self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects
    response = self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 1015, in _send_single_request
    response = transport.handle_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 232, in handle_request
    with map_httpcore_exceptions():
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
19:08:29.893 | INFO    | Task run 'get_data_task-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
19:08:34.986 | ERROR   | Task run 'get_data_task-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
    yield
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 233, in handle_request
    resp = self._pool.handle_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 216, in handle_request
    raise exc from None
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 196, in handle_request
    response = connection.handle_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 99, in handle_request
    raise exc
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 76, in handle_request
    stream = self._connect(request)
             ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 122, in _connect
    stream = self._network_backend.connect_tcp(**kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 205, in connect_tcp
    with map_exceptions(exc_map):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.ConnectError: [Errno -2] Name or service not known

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/1331835922.py", line 10, in get_data_task
    response = httpx.get(url)
               ^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 198, in get
    return request(
           ^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 106, in request
    return client.request(
           ^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 827, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 914, in send
    response = self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth
    response = self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects
    response = self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 1015, in _send_single_request
    response = transport.handle_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 232, in handle_request
    with map_httpcore_exceptions():
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
19:08:35.018 | INFO    | Task run 'get_data_task-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
19:08:40.124 | ERROR   | Task run 'get_data_task-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
    yield
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 233, in handle_request
    resp = self._pool.handle_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 216, in handle_request
    raise exc from None
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 196, in handle_request
    response = connection.handle_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 99, in handle_request
    raise exc
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 76, in handle_request
    stream = self._connect(request)
             ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 122, in _connect
    stream = self._network_backend.connect_tcp(**kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 205, in connect_tcp
    with map_exceptions(exc_map):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.ConnectError: [Errno -2] Name or service not known

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/1331835922.py", line 10, in get_data_task
    response = httpx.get(url)
               ^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 198, in get
    return request(
           ^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 106, in request
    return client.request(
           ^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 827, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 914, in send
    response = self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth
    response = self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects
    response = self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 1015, in _send_single_request
    response = transport.handle_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 232, in handle_request
    with map_httpcore_exceptions():
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
19:08:40.154 | ERROR   | Task run 'get_data_task-0' - Finished in state Failed('Task run encountered an exception ConnectError: [Errno -2] Name or service not known')
19:08:40.156 | ERROR   | Flow run 'almond-spider' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
    yield
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 233, in handle_request
    resp = self._pool.handle_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 216, in handle_request
    raise exc from None
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 196, in handle_request
    response = connection.handle_request(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 99, in handle_request
    raise exc
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 76, in handle_request
    stream = self._connect(request)
             ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 122, in _connect
    stream = self._network_backend.connect_tcp(**kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 205, in connect_tcp
    with map_exceptions(exc_map):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
    raise to_exc(exc) from exc
httpcore.ConnectError: [Errno -2] Name or service not known

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 867, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/1331835922.py", line 22, in get_data_flow
    get_data_task()
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py", line 600, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 1601, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/1331835922.py", line 10, in get_data_task
    response = httpx.get(url)
               ^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 198, in get
    return request(
           ^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 106, in request
    return client.request(
           ^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 827, in request
    return self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 914, in send
    response = self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth
    response = self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects
    response = self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 1015, in _send_single_request
    response = transport.handle_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 232, in handle_request
    with map_httpcore_exceptions():
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
    raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
19:08:40.187 | ERROR   | Flow run 'almond-spider' - Finished in state Failed('Flow run encountered an exception. ConnectError: [Errno -2] Name or service not known')
---------------------------------------------------------------------------
ConnectError                              Traceback (most recent call last)
File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py:69, in map_httpcore_exceptions()
     68 try:
---> 69     yield
     70 except Exception as exc:

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py:233, in HTTPTransport.handle_request(self, request)
    232 with map_httpcore_exceptions():
--> 233     resp = self._pool.handle_request(req)
    235 assert isinstance(resp.stream, typing.Iterable)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py:216, in ConnectionPool.handle_request(self, request)
    215     self._close_connections(closing)
--> 216     raise exc from None
    218 # Return the response. Note that in this case we still have to manage
    219 # the point at which the response is closed.

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py:196, in ConnectionPool.handle_request(self, request)
    194 try:
    195     # Send the request on the assigned connection.
--> 196     response = connection.handle_request(
    197         pool_request.request
    198     )
    199 except ConnectionNotAvailable:
    200     # In some cases a connection may initially be available to
    201     # handle a request, but then become unavailable.
    202     #
    203     # In this case we clear the connection and try again.

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py:99, in HTTPConnection.handle_request(self, request)
     98     self._connect_failed = True
---> 99     raise exc
    101 return self._connection.handle_request(request)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py:76, in HTTPConnection.handle_request(self, request)
     75 if self._connection is None:
---> 76     stream = self._connect(request)
     78     ssl_object = stream.get_extra_info("ssl_object")

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py:122, in HTTPConnection._connect(self, request)
    121 with Trace("connect_tcp", logger, request, kwargs) as trace:
--> 122     stream = self._network_backend.connect_tcp(**kwargs)
    123     trace.return_value = stream

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py:205, in SyncBackend.connect_tcp(self, host, port, timeout, local_address, socket_options)
    200 exc_map: ExceptionMapping = {
    201     socket.timeout: ConnectTimeout,
    202     OSError: ConnectError,
    203 }
--> 205 with map_exceptions(exc_map):
    206     sock = socket.create_connection(
    207         address,
    208         timeout,
    209         source_address=source_address,
    210     )

File ~/mambaforge/envs/cfast/lib/python3.11/contextlib.py:155, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
    154 try:
--> 155     self.gen.throw(typ, value, traceback)
    156 except StopIteration as exc:
    157     # Suppress StopIteration *unless* it's the same exception that
    158     # was passed to throw().  This prevents a StopIteration
    159     # raised inside the "with" statement from being suppressed.

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py:14, in map_exceptions(map)
     13     if isinstance(exc, from_exc):
---> 14         raise to_exc(exc) from exc
     15 raise

ConnectError: [Errno -2] Name or service not known

The above exception was the direct cause of the following exception:

ConnectError                              Traceback (most recent call last)
Cell In[24], line 24
     20 @flow
     21 def get_data_flow():
     22     get_data_task()
---> 24 get_data_flow()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:1224, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
   1219 if task_viz_tracker:
   1220     # this is a subflow, for now return a single task and do not go further
   1221     # we can add support for exploring subflows for tasks in the future.
   1222     return track_viz_task(self.isasync, self.name, parameters)
-> 1224 return enter_flow_run_engine_from_flow_call(
   1225     self,
   1226     parameters,
   1227     wait_for=wait_for,
   1228     return_type=return_type,
   1229 )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:297, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
    290     retval = from_async.wait_for_call_in_loop_thread(
    291         begin_run,
    292         done_callbacks=done_callbacks,
    293         contexts=contexts,
    294     )
    296 else:
--> 297     retval = from_sync.wait_for_call_in_loop_thread(
    298         begin_run,
    299         done_callbacks=done_callbacks,
    300         contexts=contexts,
    301     )
    303 return retval

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts)
    241     stack.enter_context(context)
    242 waiter.wait()
--> 243 return call.result()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout)
    312 def result(self, timeout: Optional[float] = None) -> T:
    313     """
    314     Wait for the result of the call.
    315 
    316     Not safe for use from asynchronous contexts.
    317     """
--> 318     return self.future.result(timeout=timeout)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout)
    177     raise CancelledError()
    178 elif self._state == FINISHED:
--> 179     return self.__get_result()
    181 self._condition.wait(timeout)
    183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    184     # Raise Prefect cancelled error instead of
    185     # `concurrent.futures._base.CancelledError`

File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***)
    387 with self.future.enforce_async_deadline() as cancel_scope:
    388     try:
--> 389         result = await coro
    390     finally:
    391         # Forget this call's arguments in order to free up any memory
    392         # that may be referenced by them; after a call has happened,
    393         # there's no need to keep a reference to them
    394         self.args = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/client/utilities.py:78, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     76 async with context as new_client:
     77     kwargs.setdefault("client", new_client or client)
---> 78     return await fn(*args, **kwargs)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:400, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client, user_thread)
    398     return state
    399 elif return_type == "result":
--> 400     return await state.result(fetch=True)
    401 else:
    402     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     84     raise UnfinishedRun(
     85         f"Run is in {state.type.name} state, its result is not available."
     86     )
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:867, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context, user_thread)
    862         else:
    863             from_async.call_soon_in_new_thread(
    864                 flow_call, timeout=flow.timeout_seconds
    865             )
--> 867         result = await flow_call.aresult()
    869         waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
    870             flow_run_context.task_run_futures, client=client
    871         )
    872 except PausedRun as exc:
    873     # could get raised either via utility or by returning Paused from a task run
    874     # if a task run pauses, we set its state as the flow's state
    875     # to preserve reschedule and timeout behavior

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:327, in Call.aresult(self)
    321 """
    322 Wait for the result of the call.
    323 
    324 For use from asynchronous contexts.
    325 """
    326 try:
--> 327     return await asyncio.wrap_future(self.future)
    328 except asyncio.CancelledError as exc:
    329     raise CancelledError() from exc

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:352, in Call._run_sync(***failed resolving arguments***)
    350 with self.future.enforce_sync_deadline() as cancel_scope:
    351     try:
--> 352         result = self.fn(*self.args, **self.kwargs)
    353     finally:
    354         # Forget this call's arguments in order to free up any memory
    355         # that may be referenced by them; after a call has happened,
    356         # there's no need to keep a reference to them
    357         self.args = None

Cell In[24], line 22, in get_data_flow()
     20 @flow
     21 def get_data_flow():
---> 22     get_data_task()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:600, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
    589     from prefect import get_client
    591     return submit_autonomous_task_run_to_engine(
    592         task=self,
    593         task_run=None,
   (...)
    597         client=get_client(),
    598     )
--> 600 return enter_task_run_engine(
    601     self,
    602     parameters=parameters,
    603     wait_for=wait_for,
    604     task_runner=SequentialTaskRunner(),
    605     return_type=return_type,
    606     mapped=False,
    607 )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:1421, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
   1419     return from_async.wait_for_call_in_loop_thread(begin_run)
   1420 else:
-> 1421     return from_sync.wait_for_call_in_loop_thread(begin_run)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts)
    241     stack.enter_context(context)
    242 waiter.wait()
--> 243 return call.result()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout)
    312 def result(self, timeout: Optional[float] = None) -> T:
    313     """
    314     Wait for the result of the call.
    315 
    316     Not safe for use from asynchronous contexts.
    317     """
--> 318     return self.future.result(timeout=timeout)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout)
    177     raise CancelledError()
    178 elif self._state == FINISHED:
--> 179     return self.__get_result()
    181 self._condition.wait(timeout)
    183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    184     # Raise Prefect cancelled error instead of
    185     # `concurrent.futures._base.CancelledError`

File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***)
    387 with self.future.enforce_async_deadline() as cancel_scope:
    388     try:
--> 389         result = await coro
    390     finally:
    391         # Forget this call's arguments in order to free up any memory
    392         # that may be referenced by them; after a call has happened,
    393         # there's no need to keep a reference to them
    394         self.args = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:1601, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs)
   1599     return await future._wait()
   1600 elif return_type == "result":
-> 1601     return await future._result()
   1602 else:
   1603     raise ValueError(f"Invalid return type for task engine {return_type!r}.")

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/futures.py:237, in PrefectFuture._result(self, timeout, raise_on_failure)
    235 if not final_state:
    236     raise TimeoutError("Call timed out before task finished.")
--> 237 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     84     raise UnfinishedRun(
     85         f"Run is in {state.type.name} state, its result is not available."
     86     )
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:2147, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, log_prints, interruptible, client)
   2140         logger.debug(
   2141             "Beginning execution...", extra={"state_message": True}
   2142         )
   2144     call = from_async.call_soon_in_new_thread(
   2145         create_call(task.fn, *args, **kwargs), timeout=task.timeout_seconds
   2146     )
-> 2147     result = await call.aresult()
   2149 except (CancelledError, asyncio.CancelledError) as exc:
   2150     if not call.timedout():
   2151         # If the task call was not cancelled by us; this is a crash

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:327, in Call.aresult(self)
    321 """
    322 Wait for the result of the call.
    323 
    324 For use from asynchronous contexts.
    325 """
    326 try:
--> 327     return await asyncio.wrap_future(self.future)
    328 except asyncio.CancelledError as exc:
    329     raise CancelledError() from exc

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:352, in Call._run_sync(***failed resolving arguments***)
    350 with self.future.enforce_sync_deadline() as cancel_scope:
    351     try:
--> 352         result = self.fn(*self.args, **self.kwargs)
    353     finally:
    354         # Forget this call's arguments in order to free up any memory
    355         # that may be referenced by them; after a call has happened,
    356         # there's no need to keep a reference to them
    357         self.args = None

Cell In[24], line 10, in get_data_task(url)
      6 @task(retries=2, retry_delay_seconds=5)
      7 def get_data_task(
      8     url: str = "https://api.brittle-service.com/endpoint"
      9 ) -> dict:
---> 10     response = httpx.get(url)
     12     # If the response status code is anything but a 2xx, httpx will raise
     13     # an exception. This task doesn't handle the exception, so Prefect will
     14     # catch the exception and will consider the task run failed.
     15     response.raise_for_status()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py:198, in get(url, params, headers, cookies, auth, proxy, proxies, follow_redirects, cert, verify, timeout, trust_env)
    175 def get(
    176     url: URLTypes,
    177     *,
   (...)
    188     trust_env: bool = True,
    189 ) -> Response:
    190     """
    191     Sends a `GET` request.
    192 
   (...)
    196     on this function, as `GET` requests should not include a request body.
    197     """
--> 198     return request(
    199         "GET",
    200         url,
    201         params=params,
    202         headers=headers,
    203         cookies=cookies,
    204         auth=auth,
    205         proxy=proxy,
    206         proxies=proxies,
    207         follow_redirects=follow_redirects,
    208         cert=cert,
    209         verify=verify,
    210         timeout=timeout,
    211         trust_env=trust_env,
    212     )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py:106, in request(method, url, params, content, data, files, json, headers, cookies, auth, proxy, proxies, timeout, follow_redirects, verify, cert, trust_env)
     46 """
     47 Sends an HTTP request.
     48 
   (...)
     95 ```
     96 """
     97 with Client(
     98     cookies=cookies,
     99     proxy=proxy,
   (...)
    104     trust_env=trust_env,
    105 ) as client:
--> 106     return client.request(
    107         method=method,
    108         url=url,
    109         content=content,
    110         data=data,
    111         files=files,
    112         json=json,
    113         params=params,
    114         headers=headers,
    115         auth=auth,
    116         follow_redirects=follow_redirects,
    117     )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:827, in Client.request(self, method, url, content, data, files, json, params, headers, cookies, auth, follow_redirects, timeout, extensions)
    812     warnings.warn(message, DeprecationWarning)
    814 request = self.build_request(
    815     method=method,
    816     url=url,
   (...)
    825     extensions=extensions,
    826 )
--> 827 return self.send(request, auth=auth, follow_redirects=follow_redirects)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:914, in Client.send(self, request, stream, auth, follow_redirects)
    906 follow_redirects = (
    907     self.follow_redirects
    908     if isinstance(follow_redirects, UseClientDefault)
    909     else follow_redirects
    910 )
    912 auth = self._build_request_auth(request, auth)
--> 914 response = self._send_handling_auth(
    915     request,
    916     auth=auth,
    917     follow_redirects=follow_redirects,
    918     history=[],
    919 )
    920 try:
    921     if not stream:

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:942, in Client._send_handling_auth(self, request, auth, follow_redirects, history)
    939 request = next(auth_flow)
    941 while True:
--> 942     response = self._send_handling_redirects(
    943         request,
    944         follow_redirects=follow_redirects,
    945         history=history,
    946     )
    947     try:
    948         try:

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:979, in Client._send_handling_redirects(self, request, follow_redirects, history)
    976 for hook in self._event_hooks["request"]:
    977     hook(request)
--> 979 response = self._send_single_request(request)
    980 try:
    981     for hook in self._event_hooks["response"]:

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:1015, in Client._send_single_request(self, request)
   1010     raise RuntimeError(
   1011         "Attempted to send an async request with a sync Client instance."
   1012     )
   1014 with request_context(request=request):
-> 1015     response = transport.handle_request(request)
   1017 assert isinstance(response.stream, SyncByteStream)
   1019 response.request = request

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py:232, in HTTPTransport.handle_request(self, request)
    218 assert isinstance(request.stream, SyncByteStream)
    220 req = httpcore.Request(
    221     method=request.method,
    222     url=httpcore.URL(
   (...)
    230     extensions=request.extensions,
    231 )
--> 232 with map_httpcore_exceptions():
    233     resp = self._pool.handle_request(req)
    235 assert isinstance(resp.stream, typing.Iterable)

File ~/mambaforge/envs/cfast/lib/python3.11/contextlib.py:155, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
    153     value = typ()
    154 try:
--> 155     self.gen.throw(typ, value, traceback)
    156 except StopIteration as exc:
    157     # Suppress StopIteration *unless* it's the same exception that
    158     # was passed to throw().  This prevents a StopIteration
    159     # raised inside the "with" statement from being suppressed.
    160     return exc is not value

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py:86, in map_httpcore_exceptions()
     83     raise
     85 message = str(exc)
---> 86 raise mapped_exc(message) from exc

ConnectError: [Errno -2] Name or service not known
import httpx
from prefect import flow, task

def retry_handler(task, task_run, state) -> bool:
    """This is a custom retry handler to handle when we want to retry a task"""
    try:
        # Attempt to get the result of the task
        state.result()
    except httpx.HTTPStatusError as exc:
        # Retry on any HTTP status code that is not 401 or 404
        do_not_retry_on_these_codes = [401, 404]
        return exc.response.status_code not in do_not_retry_on_these_codes
    except httpx.ConnectError:
        # Do not retry
        return False
    except:
        # For any other exception, retry
        return True

@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@flow
def get_data_flow(url):
    my_api_call_task(url=url)

if __name__ == "__main__":
    get_data_flow(url="https://httpbin.org/status/503")
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'get-data-flow' and defined at '/tmp/ipykernel_23433/2972459076.py:26' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
19:16:36.208 | INFO    | prefect.engine - Created flow run 'brilliant-bustard' for flow 'get-data-flow'
19:16:36.271 | INFO    | Flow run 'brilliant-bustard' - Created task run 'my_api_call_task-0' for task 'my_api_call_task'
19:16:36.273 | INFO    | Flow run 'brilliant-bustard' - Executing 'my_api_call_task-0' immediately...
19:16:37.330 | ERROR   | Task run 'my_api_call_task-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/2972459076.py", line 23, in my_api_call_task
    response.raise_for_status()
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_models.py", line 761, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503
19:16:37.360 | INFO    | Task run 'my_api_call_task-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
19:16:38.386 | ERROR   | Task run 'my_api_call_task-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/2972459076.py", line 23, in my_api_call_task
    response.raise_for_status()
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_models.py", line 761, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503
19:16:38.415 | ERROR   | Task run 'my_api_call_task-0' - Finished in state Failed("Task run encountered an exception HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'\nFor more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503")
19:16:38.418 | ERROR   | Flow run 'brilliant-bustard' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 867, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/2972459076.py", line 28, in get_data_flow
    my_api_call_task(url=url)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py", line 600, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 1601, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/tmp/ipykernel_23433/2972459076.py", line 8, in retry_handler
    state.result()
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/client/schemas/objects.py", line 224, in result
    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py", line 71, in get_state_result
    return _get_state_result(state, raise_on_failure=raise_on_failure)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 259, in coroutine_wrapper
    return call()
           ^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__
    return self.result()
           ^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23433/2972459076.py", line 23, in my_api_call_task
    response.raise_for_status()
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_models.py", line 761, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503
19:16:38.449 | ERROR   | Flow run 'brilliant-bustard' - Finished in state Failed("Flow run encountered an exception. HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'\nFor more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503")
---------------------------------------------------------------------------
HTTPStatusError                           Traceback (most recent call last)
Cell In[26], line 31
     28     my_api_call_task(url=url)
     30 if __name__ == "__main__":
---> 31     get_data_flow(url="https://httpbin.org/status/503")

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:1224, in Flow.__call__(self, return_state, wait_for, *args, **kwargs)
   1219 if task_viz_tracker:
   1220     # this is a subflow, for now return a single task and do not go further
   1221     # we can add support for exploring subflows for tasks in the future.
   1222     return track_viz_task(self.isasync, self.name, parameters)
-> 1224 return enter_flow_run_engine_from_flow_call(
   1225     self,
   1226     parameters,
   1227     wait_for=wait_for,
   1228     return_type=return_type,
   1229 )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:297, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type)
    290     retval = from_async.wait_for_call_in_loop_thread(
    291         begin_run,
    292         done_callbacks=done_callbacks,
    293         contexts=contexts,
    294     )
    296 else:
--> 297     retval = from_sync.wait_for_call_in_loop_thread(
    298         begin_run,
    299         done_callbacks=done_callbacks,
    300         contexts=contexts,
    301     )
    303 return retval

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts)
    241     stack.enter_context(context)
    242 waiter.wait()
--> 243 return call.result()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout)
    312 def result(self, timeout: Optional[float] = None) -> T:
    313     """
    314     Wait for the result of the call.
    315 
    316     Not safe for use from asynchronous contexts.
    317     """
--> 318     return self.future.result(timeout=timeout)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout)
    177     raise CancelledError()
    178 elif self._state == FINISHED:
--> 179     return self.__get_result()
    181 self._condition.wait(timeout)
    183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    184     # Raise Prefect cancelled error instead of
    185     # `concurrent.futures._base.CancelledError`

File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***)
    387 with self.future.enforce_async_deadline() as cancel_scope:
    388     try:
--> 389         result = await coro
    390     finally:
    391         # Forget this call's arguments in order to free up any memory
    392         # that may be referenced by them; after a call has happened,
    393         # there's no need to keep a reference to them
    394         self.args = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/client/utilities.py:78, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     76 async with context as new_client:
     77     kwargs.setdefault("client", new_client or client)
---> 78     return await fn(*args, **kwargs)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:400, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client, user_thread)
    398     return state
    399 elif return_type == "result":
--> 400     return await state.result(fetch=True)
    401 else:
    402     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     84     raise UnfinishedRun(
     85         f"Run is in {state.type.name} state, its result is not available."
     86     )
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:867, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context, user_thread)
    862         else:
    863             from_async.call_soon_in_new_thread(
    864                 flow_call, timeout=flow.timeout_seconds
    865             )
--> 867         result = await flow_call.aresult()
    869         waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
    870             flow_run_context.task_run_futures, client=client
    871         )
    872 except PausedRun as exc:
    873     # could get raised either via utility or by returning Paused from a task run
    874     # if a task run pauses, we set its state as the flow's state
    875     # to preserve reschedule and timeout behavior

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:327, in Call.aresult(self)
    321 """
    322 Wait for the result of the call.
    323 
    324 For use from asynchronous contexts.
    325 """
    326 try:
--> 327     return await asyncio.wrap_future(self.future)
    328 except asyncio.CancelledError as exc:
    329     raise CancelledError() from exc

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:352, in Call._run_sync(***failed resolving arguments***)
    350 with self.future.enforce_sync_deadline() as cancel_scope:
    351     try:
--> 352         result = self.fn(*self.args, **self.kwargs)
    353     finally:
    354         # Forget this call's arguments in order to free up any memory
    355         # that may be referenced by them; after a call has happened,
    356         # there's no need to keep a reference to them
    357         self.args = None

Cell In[26], line 28, in get_data_flow(url)
     26 @flow
     27 def get_data_flow(url):
---> 28     my_api_call_task(url=url)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:600, in Task.__call__(self, return_state, wait_for, *args, **kwargs)
    589     from prefect import get_client
    591     return submit_autonomous_task_run_to_engine(
    592         task=self,
    593         task_run=None,
   (...)
    597         client=get_client(),
    598     )
--> 600 return enter_task_run_engine(
    601     self,
    602     parameters=parameters,
    603     wait_for=wait_for,
    604     task_runner=SequentialTaskRunner(),
    605     return_type=return_type,
    606     mapped=False,
    607 )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:1421, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped)
   1419     return from_async.wait_for_call_in_loop_thread(begin_run)
   1420 else:
-> 1421     return from_sync.wait_for_call_in_loop_thread(begin_run)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts)
    241     stack.enter_context(context)
    242 waiter.wait()
--> 243 return call.result()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout)
    312 def result(self, timeout: Optional[float] = None) -> T:
    313     """
    314     Wait for the result of the call.
    315 
    316     Not safe for use from asynchronous contexts.
    317     """
--> 318     return self.future.result(timeout=timeout)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout)
    177     raise CancelledError()
    178 elif self._state == FINISHED:
--> 179     return self.__get_result()
    181 self._condition.wait(timeout)
    183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    184     # Raise Prefect cancelled error instead of
    185     # `concurrent.futures._base.CancelledError`

File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***)
    387 with self.future.enforce_async_deadline() as cancel_scope:
    388     try:
--> 389         result = await coro
    390     finally:
    391         # Forget this call's arguments in order to free up any memory
    392         # that may be referenced by them; after a call has happened,
    393         # there's no need to keep a reference to them
    394         self.args = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:1601, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs)
   1599     return await future._wait()
   1600 elif return_type == "result":
-> 1601     return await future._result()
   1602 else:
   1603     raise ValueError(f"Invalid return type for task engine {return_type!r}.")

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/futures.py:237, in PrefectFuture._result(self, timeout, raise_on_failure)
    235 if not final_state:
    236     raise TimeoutError("Call timed out before task finished.")
--> 237 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     84     raise UnfinishedRun(
     85         f"Run is in {state.type.name} state, its result is not available."
     86     )
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

Cell In[26], line 8, in retry_handler(task, task_run, state)
      5 """This is a custom retry handler to handle when we want to retry a task"""
      6 try:
      7     # Attempt to get the result of the task
----> 8     state.result()
      9 except httpx.HTTPStatusError as exc:
     10     # Retry on any HTTP status code that is not 401 or 404
     11     do_not_retry_on_these_codes = [401, 404]

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/client/schemas/objects.py:224, in State.result(self, raise_on_failure, fetch)
    152 """
    153 Retrieve the result attached to this state.
    154 
   (...)
    220     hello
    221 """
    222 from prefect.states import get_state_result
--> 224 return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:71, in get_state_result(state, raise_on_failure, fetch)
     69         return state.data
     70 else:
---> 71     return _get_state_result(state, raise_on_failure=raise_on_failure)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/utilities/asyncutils.py:259, in sync_compatible.<locals>.coroutine_wrapper(*args, **kwargs)
    257 # Run in a new event loop, but use a `Call` for nested context detection
    258 call = create_call(async_fn, *args, **kwargs)
--> 259 return call()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:432, in Call.__call__(self)
    430     return run_and_return_result()
    431 else:
--> 432     return self.result()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout)
    312 def result(self, timeout: Optional[float] = None) -> T:
    313     """
    314     Wait for the result of the call.
    315 
    316     Not safe for use from asynchronous contexts.
    317     """
--> 318     return self.future.result(timeout=timeout)

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout)
    177     raise CancelledError()
    178 elif self._state == FINISHED:
--> 179     return self.__get_result()
    181 self._condition.wait(timeout)
    183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
    184     # Raise Prefect cancelled error instead of
    185     # `concurrent.futures._base.CancelledError`

File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self)
    399 if self._exception:
    400     try:
--> 401         raise self._exception
    402     finally:
    403         # Break a reference cycle with the exception in self._exception
    404         self = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***)
    387 with self.future.enforce_async_deadline() as cancel_scope:
    388     try:
--> 389         result = await coro
    390     finally:
    391         # Forget this call's arguments in order to free up any memory
    392         # that may be referenced by them; after a call has happened,
    393         # there's no need to keep a reference to them
    394         self.args = None

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure)
     84     raise UnfinishedRun(
     85         f"Run is in {state.type.name} state, its result is not available."
     86     )
     88 if raise_on_failure and (
     89     state.is_crashed() or state.is_failed() or state.is_cancelled()
     90 ):
---> 91     raise await get_state_exception(state)
     93 if isinstance(state.data, DataDocument):
     94     result = result_from_state_with_data_document(
     95         state, raise_on_failure=raise_on_failure
     96     )

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:2147, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, log_prints, interruptible, client)
   2140         logger.debug(
   2141             "Beginning execution...", extra={"state_message": True}
   2142         )
   2144     call = from_async.call_soon_in_new_thread(
   2145         create_call(task.fn, *args, **kwargs), timeout=task.timeout_seconds
   2146     )
-> 2147     result = await call.aresult()
   2149 except (CancelledError, asyncio.CancelledError) as exc:
   2150     if not call.timedout():
   2151         # If the task call was not cancelled by us; this is a crash

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:327, in Call.aresult(self)
    321 """
    322 Wait for the result of the call.
    323 
    324 For use from asynchronous contexts.
    325 """
    326 try:
--> 327     return await asyncio.wrap_future(self.future)
    328 except asyncio.CancelledError as exc:
    329     raise CancelledError() from exc

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:352, in Call._run_sync(***failed resolving arguments***)
    350 with self.future.enforce_sync_deadline() as cancel_scope:
    351     try:
--> 352         result = self.fn(*self.args, **self.kwargs)
    353     finally:
    354         # Forget this call's arguments in order to free up any memory
    355         # that may be referenced by them; after a call has happened,
    356         # there's no need to keep a reference to them
    357         self.args = None

Cell In[26], line 23, in my_api_call_task(url)
     20 @task(retries=1, retry_condition_fn=retry_handler)
     21 def my_api_call_task(url):
     22     response = httpx.get(url)
---> 23     response.raise_for_status()
     24     return response.json()

File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_models.py:761, in Response.raise_for_status(self)
    759 error_type = error_types.get(status_class, "Invalid status code")
    760 message = message.format(self, error_type=error_type)
--> 761 raise HTTPStatusError(message, request=request, response=self)

HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503

Timeouts

from prefect import task, get_run_logger
import time

@task(timeout_seconds=1)
def show_timeouts():
    logger = get_run_logger()
    logger.info("I will execute")
    time.sleep(5)
    logger.info("I will not execute")

Wait for

@task
def task_1():
    pass

@task
def task_2():
    pass

@flow
def my_flow():
    x = task_1()

    # task 2 will wait for task_1 to complete
    y = task_2(wait_for=[x])
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_23433/1697639020.py:9' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(

Maps

from prefect import flow, task

@task
def print_nums(nums):
    for n in nums:
        print(n)

@task
def square_num(num):
    return num**2

@flow
def map_flow(nums):
    print_nums(nums)
    squared_nums = square_num.map(nums) 
    print_nums(squared_nums)

map_flow([1,2,3,5,8,13])
19:23:30.843 | INFO    | prefect.engine - Created flow run 'stereotyped-wolf' for flow 'map-flow'
19:23:30.904 | INFO    | Flow run 'stereotyped-wolf' - Created task run 'print_nums-0' for task 'print_nums'
19:23:30.906 | INFO    | Flow run 'stereotyped-wolf' - Executing 'print_nums-0' immediately...
1
2
3
5
8
13
19:23:30.981 | INFO    | Task run 'print_nums-0' - Finished in state Completed()
19:23:31.067 | INFO    | Flow run 'stereotyped-wolf' - Created task run 'square_num-3' for task 'square_num'
19:23:31.069 | INFO    | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-3' for execution.
19:23:31.085 | INFO    | Flow run 'stereotyped-wolf' - Created task run 'square_num-2' for task 'square_num'
19:23:31.087 | INFO    | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-2' for execution.
19:23:31.105 | INFO    | Flow run 'stereotyped-wolf' - Created task run 'square_num-4' for task 'square_num'
19:23:31.106 | INFO    | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-4' for execution.
19:23:31.176 | INFO    | Flow run 'stereotyped-wolf' - Created task run 'square_num-1' for task 'square_num'
19:23:31.179 | INFO    | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-1' for execution.
19:23:31.216 | INFO    | Task run 'square_num-3' - Finished in state Completed()
19:23:31.233 | INFO    | Task run 'square_num-4' - Finished in state Completed()
19:23:31.244 | INFO    | Flow run 'stereotyped-wolf' - Created task run 'square_num-5' for task 'square_num'
19:23:31.246 | INFO    | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-5' for execution.
19:23:31.331 | INFO    | Task run 'square_num-5' - Finished in state Completed()
19:23:31.352 | INFO    | Task run 'square_num-1' - Finished in state Completed()
19:23:31.396 | INFO    | Flow run 'stereotyped-wolf' - Created task run 'square_num-0' for task 'square_num'
19:23:31.399 | INFO    | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-0' for execution.
19:23:31.452 | INFO    | Flow run 'stereotyped-wolf' - Created task run 'print_nums-1' for task 'print_nums'
19:23:31.453 | INFO    | Flow run 'stereotyped-wolf' - Executing 'print_nums-1' immediately...
19:23:31.477 | INFO    | Task run 'square_num-0' - Finished in state Completed()
19:23:31.615 | INFO    | Task run 'square_num-2' - Finished in state Completed()
1
4
9
25
64
169
19:23:31.681 | INFO    | Task run 'print_nums-1' - Finished in state Completed()
19:23:31.710 | INFO    | Flow run 'stereotyped-wolf' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

Async

import asyncio

from prefect import task, flow

@task
async def print_values(values):
    for value in values:
        await asyncio.sleep(1) # yield
        print(value, end=" ")

@flow
async def async_flow():
    await print_values([1, 2])  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]

    # asynchronously gather the tasks
    await asyncio.gather(*coros)
asyncio.run(async_flow())
from prefect import get_client

async with get_client() as client:
    # set a concurrency limit of 10 on the 'small_instance' tag
    limit_id = await client.create_concurrency_limit(
        tag="small_instance", 
        concurrency_limit=10
        )
!prefect concurrency-limit inspect small_instance
╭──────────────────────────────────────────────────────────────────────────╮
│        Concurrency Limit ID: 879f2e40-8387-47c5-af34-0b164f7ea8bc        │
│ ┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓ │
│ ┃ Tag            ┃ Concurrency Limit ┃ Created        ┃ Updated        ┃ │
│ ┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩ │
│ │ small_instance │ 10                │ '1 minute ago' │ '1 minute ago' │ │
│ └────────────────┴───────────────────┴────────────────┴────────────────┘ │
│ ┏━━━━━━━━━━━━━━━━━━━━━┓                                                  │
│ ┃ Active Task Run IDs ┃                                                  │
│ ┡━━━━━━━━━━━━━━━━━━━━━┩                                                  │
│ └─────────────────────┘                                                  │
╰──────────────────────────────────────────────────────────────────────────╯

Deployments

name: prefect.yaml

# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.

# Generic metadata about this project
name: nbs
prefect-version: 2.16.8

# build section allows you to manage and build docker images
build:

# push section allows you to manage if and how this project is uploaded to remote locations
push:

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.git_clone:
    repository: git@github.com:bthek1/MLtools.git
    branch: main

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: slow_flow
  version:
  tags: []
  description: Sleepy flow - sleeps the provided amount of time (in seconds).
  entrypoint: nbs/prefect_deployment_serve.py:slow_flow
  parameters: {}
  work_pool:
    name: test-pool
    work_queue_name:
    job_variables: {}
  schedules:
  - interval: 30.0
    anchor_date: '2024-04-03T13:38:23.549390+00:00'
    timezone: UTC
    active: true
- name: fast_flow
  version:
  tags: []
  description: Fastest flow this side of the Mississippi.
  entrypoint: nbs/prefect_deployment_serve.py:fast_flow
  parameters: {}
  work_pool:
    name: test-pool
    work_queue_name:
    job_variables: {}
  schedules:
  - interval: 60.0
    anchor_date: '2024-04-03T14:13:30.384393+00:00'
    timezone: UTC
    active: true
  - interval: 150.0
    anchor_date: '2024-04-03T14:13:45.806620+00:00'
    timezone: UTC
    active: false

Work Pools : To do

Work pool overview¶

Work pools organize work for execution. Work pools have types corresponding to the infrastructure that will execute the flow code, as well as the delivery method of work to that environment. Pull work pools require workers (or less ideally, agents) to poll the work pool for flow runs to execute. Push work pools can submit runs directly to your serverless infrastructure providers such as Google Cloud Run, Azure Container Instances, and AWS ECS without the need for an agent or worker.

prefect work-pool create test-pool
!prefect work-pool ls
                                   Work Pools                                   
┏━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┓
┃ Name      ┃ Type   ┃                                   ID ┃ Concurrency Lim… ┃
┡━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━┩
│ test-pool │ proce… │ 5d41c025-ec45-4e64-9ef6-3ca91b9684e0 │ None             │
└───────────┴────────┴──────────────────────────────────────┴──────────────────┘
                           (**) denotes a paused pool                           
!prefect work-pool inspect 'test-pool'
WorkPool(
    id='5d41c025-ec45-4e64-9ef6-3ca91b9684e0',
    created=DateTime(2024, 4, 3, 10, 12, 27, 298785, tzinfo=Timezone('+00:00')),
    updated=DateTime(2024, 4, 3, 10, 12, 27, 306000, tzinfo=Timezone('+00:00')),
    name='test-pool',
    type='process',
    base_job_template={
        'job_configuration': {
            'command': '{{ command }}',
            'env': '{{ env }}',
            'labels': '{{ labels }}',
            'name': '{{ name }}',
            'stream_output': '{{ stream_output }}',
            'working_dir': '{{ working_dir }}'
        },
        'variables': {
            'type': 'object',
            'properties': {
                'name': {
                    'title': 'Name',
                    'description': 'Name given to infrastructure created by a 
worker.',
                    'type': 'string'
                },
                'env': {
                    'title': 'Environment Variables',
                    'description': 'Environment variables to set when starting a
flow run.',
                    'type': 'object',
                    'additionalProperties': {'type': 'string'}
                },
                'labels': {
                    'title': 'Labels',
                    'description': 'Labels applied to infrastructure created by 
a worker.',
                    'type': 'object',
                    'additionalProperties': {'type': 'string'}
                },
                'command': {
                    'title': 'Command',
                    'description': 'The command to use when starting a flow run.
In most cases, this should be left blank and the command will be automatically 
generated by the worker.',
                    'type': 'string'
                },
                'stream_output': {
                    'title': 'Stream Output',
                    'description': 'If enabled, workers will stream output from 
flow run processes to local standard output.',
                    'default': True,
                    'type': 'boolean'
                },
                'working_dir': {
                    'title': 'Working Directory',
                    'description': 'If provided, workers will open flow run 
processes within the specified path as the working directory. Otherwise, a 
temporary directory will be created.',
                    'type': 'string',
                    'format': 'path'
                }
            }
        }
    },
    status=WorkPoolStatus.NOT_READY,
    default_queue_id='12cd8c55-db98-4e1c-baf6-ffffcfbb613f'
)

Schedules : To do

Prefect supports several types of schedules that cover a wide range of use cases and offer a large degree of customization:

  • Cron is most appropriate for users who are already familiar with cron from previous use.
  • Interval is best suited for deployments that need to run at some consistent cadence that isn’t related to absolute time.
  • RRule is best suited for deployments that rely on calendar logic for simple recurring schedules, irregular intervals, exclusions, or day-of-month adjustments.

!!! tip “Schedules can be inactive” When you create or edit a schedule, you can set the active property to False in Python (or false in a YAML file) to deactivate the schedule. This is useful if you want to keep the schedule configuration but temporarily stop the schedule from creating new flow runs.

Results

from prefect import flow, task

@task
def my_task():
    return 1

@flow
def my_flow():
    future = my_task.submit()
    return future.result() + 1

result = my_flow()
assert result == 2
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_task' and defined at '/tmp/ipykernel_44036/1506838836.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/1506838836.py:7' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
21:13:16.692 | INFO    | prefect.engine - Created flow run 'ultra-capuchin' for flow 'my-flow'
21:13:16.756 | INFO    | Flow run 'ultra-capuchin' - Created task run 'my_task-0' for task 'my_task'
21:13:16.758 | INFO    | Flow run 'ultra-capuchin' - Submitted task run 'my_task-0' for execution.
21:13:16.833 | INFO    | Task run 'my_task-0' - Finished in state Completed()
21:13:16.862 | INFO    | Flow run 'ultra-capuchin' - Finished in state Completed()

Error handling

from prefect import flow, task

@task
def my_task():
    raise ValueError()

@flow
def my_flow():
    state = my_task(return_state=True)

    if state.is_failed():
        print("Oh no! The task failed. Falling back to '1'.")
        result = 1
    else:
        result = state.result()

    return result + 1

result = my_flow()
assert result == 2
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_task' and defined at '/tmp/ipykernel_44036/2193582311.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/2193582311.py:7' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
21:24:04.344 | INFO    | prefect.engine - Created flow run 'active-flamingo' for flow 'my-flow'
21:24:04.406 | INFO    | Flow run 'active-flamingo' - Created task run 'my_task-0' for task 'my_task'
21:24:04.407 | INFO    | Flow run 'active-flamingo' - Executing 'my_task-0' immediately...
21:24:04.454 | ERROR   | Task run 'my_task-0' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_44036/2193582311.py", line 5, in my_task
    raise ValueError()
ValueError
21:24:04.485 | ERROR   | Task run 'my_task-0' - Finished in state Failed('Task run encountered an exception ValueError: ')
Oh no! The task failed. Falling back to '1'.
21:24:04.516 | INFO    | Flow run 'active-flamingo' - Finished in state Completed()

Artifacts

from prefect import flow, task
from prefect.artifacts import create_link_artifact

@task
def my_first_task():
    create_link_artifact(
        key="create-link-artifact",
        link="my_first_task",
        description="## my_first_task",
    )

@task
def my_second_task():
    create_link_artifact(
        key="create-link-artifact",
        link="my_second_task",
        description="## my_second_task",
    )

@flow
def my_flow():
    create_link_artifact(
        key="create-link-artifact",
        link="my_flow",
        description="## my_flow",
)
    my_first_task()
    my_second_task()

if __name__ == "__main__":
    my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_first_task' and defined at '/tmp/ipykernel_44036/2726054530.py:4' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_second_task' and defined at '/tmp/ipykernel_44036/2726054530.py:12' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/2726054530.py:20' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
21:31:10.608 | INFO    | prefect.engine - Created flow run 'important-limpet' for flow 'my-flow'
21:31:10.688 | INFO    | Flow run 'important-limpet' - Created task run 'my_first_task-0' for task 'my_first_task'
21:31:10.690 | INFO    | Flow run 'important-limpet' - Executing 'my_first_task-0' immediately...
21:31:10.782 | INFO    | Task run 'my_first_task-0' - Finished in state Completed()
21:31:10.803 | INFO    | Flow run 'important-limpet' - Created task run 'my_second_task-0' for task 'my_second_task'
21:31:10.805 | INFO    | Flow run 'important-limpet' - Executing 'my_second_task-0' immediately...
21:31:10.903 | INFO    | Task run 'my_second_task-0' - Finished in state Completed()
21:31:10.932 | INFO    | Flow run 'important-limpet' - Finished in state Completed('All states completed.')
from prefect import flow
from prefect.artifacts import create_link_artifact

@flow
def my_flow():
    create_link_artifact(
        key="my-important-link",
        link="https://www.prefect.io/",
        link_text="Prefect",
    )

if __name__ == "__main__":
    my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/1748943510.py:4' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
21:57:21.601 | INFO    | prefect.engine - Created flow run 'tomato-eagle' for flow 'my-flow'
21:57:21.688 | INFO    | Flow run 'tomato-eagle' - Finished in state Completed()
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact

@task
def markdown_task():
    na_revenue = 500000
    markdown_report = f"""# Sales Report

## Summary

In the past quarter, our company saw a significant increase in sales, with a total revenue of $1,000,000. 
This represents a 20% increase over the same period last year.

## Sales by Region

| Region        | Revenue |
|:--------------|-------:|
| North America | ${na_revenue:,} |
| Europe        | $250,000 |
| Asia          | $150,000 |
| South America | $75,000 |
| Africa        | $25,000 |

## Top Products

1. Product A - $300,000 in revenue
2. Product B - $200,000 in revenue
3. Product C - $150,000 in revenue

## Conclusion

Overall, these results are very encouraging and demonstrate the success of our sales team in increasing revenue 
across all regions. However, we still have room for improvement and should focus on further increasing sales in 
the coming quarter.
"""
    create_markdown_artifact(
        key="gtm-report",
        markdown=markdown_report,
        description="Quarterly Sales Report",
    )

@flow()
def my_flow():
    markdown_task()


if __name__ == "__main__":
    my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/3501414838.py:42' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
21:58:42.196 | INFO    | prefect.engine - Created flow run 'sociable-jaguarundi' for flow 'my-flow'
21:58:42.266 | INFO    | Flow run 'sociable-jaguarundi' - Created task run 'markdown_task-0' for task 'markdown_task'
21:58:42.267 | INFO    | Flow run 'sociable-jaguarundi' - Executing 'markdown_task-0' immediately...
21:58:42.360 | INFO    | Task run 'markdown_task-0' - Finished in state Completed()
21:58:42.392 | INFO    | Flow run 'sociable-jaguarundi' - Finished in state Completed('All states completed.')

States

When calling a task or a flow, there are three types of returned values:

  • Data: A Python object (such as int, str, dict, list, and so on).
  • State: A Prefect object indicating the state of a flow or task run.
  • PrefectFuture: A Prefect object that contains both data and State.

Returning data  is the default behavior any time you call your_task().

Returning Prefect State occurs anytime you call your task or flow with the argument return_state=True.

Returning PrefectFuture is achieved by calling your_task.submit().

from prefect import flow

def my_success_hook(flow, flow_run, state):
    print("Flow run succeeded!")

@flow(on_completion=[my_success_hook])
def my_flow():
    return 42

my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/2923164121.py:6' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
22:27:55.866 | INFO    | prefect.engine - Created flow run 'lime-bullmastiff' for flow 'my-flow'
22:27:55.935 | INFO    | Flow run 'lime-bullmastiff' - Running hook 'my_success_hook' in response to entering state 'Completed'
Flow run succeeded!
22:27:55.939 | INFO    | Flow run 'lime-bullmastiff' - Hook 'my_success_hook' finished running successfully
22:27:55.941 | INFO    | Flow run 'lime-bullmastiff' - Finished in state Completed()
42

Create flow run state change hooks¶

def my_flow_hook(flow: Flow, flow_run: FlowRun, state: State):
    """This is the required signature for a flow run state
    change hook. This hook can only be passed into flows.
    """

# pass hook as a list of callables
@flow(on_completion=[my_flow_hook])

Create task run state change hooks¶

def my_task_hook(task: Task, task_run: TaskRun, state: State):
    """This is the required signature for a task run state change
    hook. This hook can only be passed into tasks.
    """

# pass hook as a list of callables
@task(on_failure=[my_task_hook])

Serve

from prefect import flow


@flow(log_prints=True)
def hello_world(name: str = "world", goodbye: bool = False):
    print(f"Hello {name} from Prefect! 🤗")

    if goodbye:
        print(f"Goodbye {name}!")


if __name__ == "__main__":
    # creates a deployment and stays running to monitor for work instructions generated on the server

    hello_world.serve(name="my-first-deployment",
                      tags=["onboarding"],
                      parameters={"goodbye": True},
                      interval=60)
import time
from prefect import flow, serve


@flow
def slow_flow(sleep: int = 60):
    "Sleepy flow - sleeps the provided amount of time (in seconds)."
    time.sleep(sleep)


@flow
def fast_flow():
    "Fastest flow this side of the Mississippi."
    return


if __name__ == "__main__":
    slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
    fast_deploy = fast_flow.to_deployment(name="fast")
    serve(slow_deploy, fast_deploy)
Back to top