from prefect import flow, task
@task
def print_hello(name):
print(f"Hello {name}!")
@flow(name="Hello Flow")
def hello_world(name="world"):
print_hello(name)
Prefect
Start prefect server
prefect server start
http://127.0.0.1:4200
Change Settings
prefect config set PREFECT_API_URL="http://127.0.0.1:4200/api"
View settings
prefect config view --show-defaults
Flows
Flows can be thought of as special types of functions. They can take inputs, perform work, and return an output. In fact, you can turn any function into a Prefect flow by adding the @flow decorator. When a function becomes a flow, its behavior changes, giving it the following advantages:
Task called within Flows
import datetime
from prefect import flow
@flow(flow_run_name="{name}-on-{date:%A}")
def my_flow(name: str, date: datetime.datetime):
pass
# creates a flow run called 'marvin-on-Thursday'
="marvin", date=datetime.datetime.now(datetime.timezone.utc)) my_flow(name
18:17:46.247 | INFO | prefect.engine - Created flow run 'placid-finch' for flow 'my-flow'
18:17:46.320 | INFO | Flow run 'placid-finch' - Finished in state Completed()
Flow settings
Flows allow a great deal of configuration by passing arguments to the decorator. Flows accept the following optional settings.
Argument | Description |
---|---|
description |
An optional string description for the flow. If not provided, the description will be pulled from the docstring for the decorated function. |
name |
An optional name for the flow. If not provided, the name will be inferred from the function. |
retries |
An optional number of times to retry on flow run failure. |
retry_delay_seconds |
An optional number of seconds to wait before retrying the flow after failure. This is only applicable if retries is nonzero. |
flow_run_name |
An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow’s parameters as variables; this name can also be provided as a function that returns a string. |
task_runner |
An optional task runner to use for task execution within the flow when you .submit() tasks. If not provided and you .submit() tasks, the ConcurrentTaskRunner will be used. |
timeout_seconds |
An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it will be marked as failed. Flow execution may continue until the next task is called. |
validate_parameters |
Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is True . |
version |
An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. |
Flow Example
from prefect import flow, task
import datetime
from prefect.runtime import flow_run
from prefect.task_runners import SequentialTaskRunner
def generate_flow_run_name():
= flow_run.flow_name
flow_name = flow_run.parameters
parameters = parameters["name"]
name
= datetime.datetime.now(datetime.timezone.utc)
date
return f"flow run name test: {flow_name}-with-{name}: {date:%A}"
@task(name="task test")
def print_hello(name:str) -> str:
= f"Hello {name}!"
msg print(msg)
return msg
@flow(name="flow test",
="flow test description",
description=SequentialTaskRunner(),
task_runner=generate_flow_run_name
flow_run_name
)def hello_world(name:str ="world") -> None:
= print_hello(name)
message
"Ben") hello_world(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'task test' and defined at '/tmp/ipykernel_23433/1543666974.py:16' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'flow test' and defined at '/tmp/ipykernel_23433/1543666974.py:22' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
18:31:47.848 | INFO | prefect.engine - Created flow run 'caped-swan' for flow 'flow test'
18:31:47.927 | INFO | Flow run 'flow run name test: flow test-with-Ben: Wednesday' - Created task run 'task test-0' for task 'task test'
18:31:47.929 | INFO | Flow run 'flow run name test: flow test-with-Ben: Wednesday' - Executing 'task test-0' immediately...
Hello Ben!
18:31:47.998 | INFO | Task run 'task test-0' - Finished in state Completed()
18:31:48.025 | INFO | Flow run 'caped-swan' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`'))]
import graphviz
from prefect import flow, task
@task(name="Print Hello")
def print_hello(name):
= f"Hello {name}!"
msg print(msg)
return msg
@task(name="Print Hello Again")
def print_hello_again(name):
= f"Hello {name}!"
msg print(msg)
return msg
@flow(name="Hello Flow")
def hello_world(name="world"):
= print_hello(name)
message = print_hello_again(message)
message2
hello_world.visualize()
<coroutine object Flow.visualize>
Subflows
A subflow run is created when a flow function is called inside the execution of another flow. The primary flow is the “parent” flow. The flow created within the parent is the “child” flow or “subflow.”
Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately. When a subflow starts, it will create a new task runner for tasks within the subflow. When the subflow completes, the task runner is shut down.
from prefect import flow, task
@task(name="Print Hello")
def print_hello(name):
= f"Hello {name}!"
msg print(msg)
return msg
@flow(name="Subflow")
def my_subflow(msg):
print(f"Subflow says: {msg}")
@flow(name="Hello Flow")
def hello_world(name="world"):
for _ in range(3):
= print_hello(name)
message
my_subflow(message)
"Marvin") hello_world(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'Print Hello' and defined at '/tmp/ipykernel_23433/3107251931.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'Subflow' and defined at '/tmp/ipykernel_23433/3107251931.py:9' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'Hello Flow' and defined at '/tmp/ipykernel_23433/3107251931.py:13' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
18:41:19.292 | INFO | prefect.engine - Created flow run 'wonderful-wolf' for flow 'Hello Flow'
18:41:19.356 | INFO | Flow run 'wonderful-wolf' - Created task run 'Print Hello-0' for task 'Print Hello'
18:41:19.358 | INFO | Flow run 'wonderful-wolf' - Executing 'Print Hello-0' immediately...
Hello Marvin!
18:41:19.430 | INFO | Task run 'Print Hello-0' - Finished in state Completed()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'Subflow' and defined at '/tmp/ipykernel_23433/3107251931.py:9' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
18:41:19.504 | INFO | Flow run 'wonderful-wolf' - Created subflow run 'aspiring-aardwark' for flow 'Subflow'
Subflow says: Hello Marvin!
18:41:19.585 | INFO | Flow run 'aspiring-aardwark' - Finished in state Completed()
18:41:19.607 | INFO | Flow run 'wonderful-wolf' - Created task run 'Print Hello-1' for task 'Print Hello'
18:41:19.608 | INFO | Flow run 'wonderful-wolf' - Executing 'Print Hello-1' immediately...
Hello Marvin!
18:41:19.672 | INFO | Task run 'Print Hello-1' - Finished in state Completed()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'Subflow' and defined at '/tmp/ipykernel_23433/3107251931.py:9' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
18:41:19.738 | INFO | Flow run 'wonderful-wolf' - Created subflow run 'garrulous-starling' for flow 'Subflow'
Subflow says: Hello Marvin!
18:41:19.815 | INFO | Flow run 'garrulous-starling' - Finished in state Completed()
18:41:19.839 | INFO | Flow run 'wonderful-wolf' - Created task run 'Print Hello-2' for task 'Print Hello'
18:41:19.840 | INFO | Flow run 'wonderful-wolf' - Executing 'Print Hello-2' immediately...
Hello Marvin!
18:41:19.904 | INFO | Task run 'Print Hello-2' - Finished in state Completed()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'Subflow' and defined at '/tmp/ipykernel_23433/3107251931.py:9' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
18:41:19.976 | INFO | Flow run 'wonderful-wolf' - Created subflow run 'snobbish-rottweiler' for flow 'Subflow'
Subflow says: Hello Marvin!
18:41:20.056 | INFO | Flow run 'snobbish-rottweiler' - Finished in state Completed()
18:41:20.084 | INFO | Flow run 'wonderful-wolf' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `str`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]
!!! tip “Subflows or tasks?” In Prefect you can call tasks or subflows to do work within your workflow, including passing results from other tasks to your subflow. So a common question is:
"When should I use a subflow instead of a task?"
We recommend writing tasks that do a discrete, specific piece of work in your workflow: calling an API, performing a database operation, analyzing or transforming a data point.
Prefect tasks are well suited to parallel or distributed execution using distributed computation frameworks such as Dask or Ray.
For troubleshooting, the more granular you create your tasks, the easier it is to find and fix issues should a task fail.
Subflows enable you to group related tasks within your workflow.
Here are some scenarios where you might choose to use a subflow rather than calling tasks individually:
- Observability: Subflows, like any other flow run, have first-class observability within the Prefect UI and Prefect Cloud. You'll see subflow status in the **Flow Runs** dashboard rather than having to dig down into the tasks within a specific flow run. See [Final state determination](#final-state-determination) for some examples of leveraging task state within flows.
- Conditional flows: If you have a group of tasks that run only under certain conditions, you can group them within a subflow and conditionally run the subflow rather than each task individually.
- Parameters: Flows have first-class support for parameterization, making it easy to run the same group of tasks in different use cases by simply passing different parameters to the subflow in which they run.
- Task runners: Subflows enable you to specify the task runner used for tasks within the flow. For example, if you want to optimize parallel execution of certain tasks with Dask, you can group them in a subflow that uses the Dask task runner. You can use a different task runner for each subflow.
from prefect import flow
from datetime import datetime
@flow
def what_day_is_it(date: datetime = None):
if date is None:
= datetime.now(timezone.utc)
date print(f"It was {date.strftime('%A')} on {date.isoformat()}")
"2021-01-01T02:00:19.180906")
what_day_is_it(# It was Friday on 2021-01-01T02:00:19.180906
18:49:42.898 | INFO | prefect.engine - Created flow run 'vermilion-spider' for flow 'what-day-is-it'
It was Friday on 2021-01-01T02:00:19.180906
18:49:42.963 | INFO | Flow run 'vermilion-spider' - Finished in state Completed()
Tasks
Task arguments
Tasks allow for customization through optional arguments:
Argument | Description |
---|---|
name |
An optional name for the task. If not provided, the name will be inferred from the function name. |
description |
An optional string description for the task. If not provided, the description will be pulled from the docstring for the decorated function. |
tags |
An optional set of tags to be associated with runs of this task. These tags are combined with any tags defined by a prefect.tags context at task runtime. |
cache_key_fn |
An optional callable that, given the task run context and call parameters, generates a string key. If the key matches a previous completed state, that state result will be restored instead of running the task again. |
cache_expiration |
An optional amount of time indicating how long cached states for this task should be restorable; if not provided, cached states will never expire. |
retries |
An optional number of times to retry on task run failure. |
retry_delay_seconds |
An optional number of seconds to wait before retrying the task after failure. This is only applicable if retries is nonzero. |
log_prints |
An optional boolean indicating whether to log print statements. |
persist_result |
An optional boolean indicating whether to persist the result of the task run to storage. |
from prefect import flow, task
@task
def my_first_task(msg):
print(f"Hello, {msg}")
@task
def my_second_task(msg):
my_first_task.fn(msg)return msg
@flow(flow_run_name = "task test1")
def my_flow(msg: str = "Trillian"):
my_second_task(msg)return msg
my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_first_task' and defined at '/tmp/ipykernel_23433/819281590.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_second_task' and defined at '/tmp/ipykernel_23433/819281590.py:7' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_23433/819281590.py:12' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
19:04:31.723 | INFO | prefect.engine - Created flow run 'fluorescent-bittern' for flow 'my-flow'
19:04:31.802 | INFO | Flow run 'task test1' - Created task run 'my_second_task-0' for task 'my_second_task'
19:04:31.804 | INFO | Flow run 'task test1' - Executing 'my_second_task-0' immediately...
Hello, Trillian
19:04:31.873 | INFO | Task run 'my_second_task-0' - Finished in state Completed()
19:04:31.904 | INFO | Flow run 'fluorescent-bittern' - Finished in state Completed()
'Trillian'
from prefect import flow
from prefect.runtime import flow_run, task_run
def generate_task_name():
= flow_run.flow_name
flow_name = task_run.task_name
task_name
= task_run.parameters
parameters = parameters["name"]
name = parameters["limit"]
limit
return f"{flow_name}-{task_name}-with-{name}-and-{limit}"
@task(name="my-example-task",
="An example task for a tutorial.",
description=generate_task_name)
task_run_namedef my_task(name: str, limit: int = 100):
pass
@flow
def my_flow(name: str):
# creates a run with a name like "my-flow-my-example-task-with-marvin-and-100"
="marvin") my_task(name
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_23433/732348698.py:20' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
Retries
import httpx
from prefect import flow, task
@task(retries=2, retry_delay_seconds=5)
def get_data_task(
str = "https://api.brittle-service.com/endpoint"
url: -> dict:
) = httpx.get(url)
response
# If the response status code is anything but a 2xx, httpx will raise
# an exception. This task doesn't handle the exception, so Prefect will
# catch the exception and will consider the task run failed.
response.raise_for_status()
return response.json()
@flow
def get_data_flow():
get_data_task()
get_data_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'get_data_task' and defined at '/tmp/ipykernel_23433/1331835922.py:6' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'get-data-flow' and defined at '/tmp/ipykernel_23433/1331835922.py:20' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
19:08:29.596 | INFO | prefect.engine - Created flow run 'almond-spider' for flow 'get-data-flow'
19:08:29.655 | INFO | Flow run 'almond-spider' - Created task run 'get_data_task-0' for task 'get_data_task'
19:08:29.658 | INFO | Flow run 'almond-spider' - Executing 'get_data_task-0' immediately...
19:08:29.859 | ERROR | Task run 'get_data_task-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
yield
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 233, in handle_request
resp = self._pool.handle_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 216, in handle_request
raise exc from None
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 196, in handle_request
response = connection.handle_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 99, in handle_request
raise exc
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 76, in handle_request
stream = self._connect(request)
^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 122, in _connect
stream = self._network_backend.connect_tcp(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 205, in connect_tcp
with map_exceptions(exc_map):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.ConnectError: [Errno -2] Name or service not known
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/tmp/ipykernel_23433/1331835922.py", line 10, in get_data_task
response = httpx.get(url)
^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 198, in get
return request(
^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 106, in request
return client.request(
^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 827, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 914, in send
response = self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth
response = self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects
response = self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 1015, in _send_single_request
response = transport.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 232, in handle_request
with map_httpcore_exceptions():
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
19:08:29.893 | INFO | Task run 'get_data_task-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
19:08:34.986 | ERROR | Task run 'get_data_task-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
yield
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 233, in handle_request
resp = self._pool.handle_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 216, in handle_request
raise exc from None
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 196, in handle_request
response = connection.handle_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 99, in handle_request
raise exc
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 76, in handle_request
stream = self._connect(request)
^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 122, in _connect
stream = self._network_backend.connect_tcp(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 205, in connect_tcp
with map_exceptions(exc_map):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.ConnectError: [Errno -2] Name or service not known
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/tmp/ipykernel_23433/1331835922.py", line 10, in get_data_task
response = httpx.get(url)
^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 198, in get
return request(
^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 106, in request
return client.request(
^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 827, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 914, in send
response = self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth
response = self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects
response = self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 1015, in _send_single_request
response = transport.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 232, in handle_request
with map_httpcore_exceptions():
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
19:08:35.018 | INFO | Task run 'get_data_task-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
19:08:40.124 | ERROR | Task run 'get_data_task-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions
yield
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 233, in handle_request
resp = self._pool.handle_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 216, in handle_request
raise exc from None
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 196, in handle_request
response = connection.handle_request(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 99, in handle_request
raise exc
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 76, in handle_request
stream = self._connect(request)
^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 122, in _connect
stream = self._network_backend.connect_tcp(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 205, in connect_tcp
with map_exceptions(exc_map):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.ConnectError: [Errno -2] Name or service not known
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/tmp/ipykernel_23433/1331835922.py", line 10, in get_data_task
response = httpx.get(url)
^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 198, in get
return request(
^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 106, in request
return client.request(
^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 827, in request
return self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 914, in send
response = self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth
response = self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects
response = self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 1015, in _send_single_request
response = transport.handle_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 232, in handle_request
with map_httpcore_exceptions():
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__
self.gen.throw(typ, value, traceback)
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ConnectError: [Errno -2] Name or service not known
19:08:40.154 | ERROR | Task run 'get_data_task-0' - Finished in state Failed('Task run encountered an exception ConnectError: [Errno -2] Name or service not known')
19:08:40.156 | ERROR | Flow run 'almond-spider' - Encountered exception during execution: Traceback (most recent call last): File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 69, in map_httpcore_exceptions yield File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 233, in handle_request resp = self._pool.handle_request(req) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 216, in handle_request raise exc from None File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py", line 196, in handle_request response = connection.handle_request( ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 99, in handle_request raise exc File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 76, in handle_request stream = self._connect(request) ^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py", line 122, in _connect stream = self._network_backend.connect_tcp(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py", line 205, in connect_tcp with map_exceptions(exc_map): File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__ self.gen.throw(typ, value, traceback) File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py", line 14, in map_exceptions raise to_exc(exc) from exc httpcore.ConnectError: [Errno -2] Name or service not known The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 867, in orchestrate_flow_run result = await flow_call.aresult() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync result = self.fn(*self.args, **self.kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/tmp/ipykernel_23433/1331835922.py", line 22, in get_data_flow get_data_task() File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py", line 600, in __call__ return enter_task_run_engine( ^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine return from_sync.wait_for_call_in_loop_thread(begin_run) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread return call.result() ^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result return self.future.result(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro ^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 1601, in get_task_call_return_value return await future._result() ^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result return await final_state.result(raise_on_failure=raise_on_failure, fetch=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run result = await call.aresult() ^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync result = self.fn(*self.args, **self.kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/tmp/ipykernel_23433/1331835922.py", line 10, in get_data_task response = httpx.get(url) ^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 198, in get return request( ^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py", line 106, in request return client.request( ^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 827, in request return self.send(request, auth=auth, follow_redirects=follow_redirects) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 914, in send response = self._send_handling_auth( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 942, in _send_handling_auth response = self._send_handling_redirects( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 979, in _send_handling_redirects response = self._send_single_request(request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py", line 1015, in _send_single_request response = transport.handle_request(request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 232, in handle_request with map_httpcore_exceptions(): File "/home/ben/mambaforge/envs/cfast/lib/python3.11/contextlib.py", line 155, in __exit__ self.gen.throw(typ, value, traceback) File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py", line 86, in map_httpcore_exceptions raise mapped_exc(message) from exc httpx.ConnectError: [Errno -2] Name or service not known
19:08:40.187 | ERROR | Flow run 'almond-spider' - Finished in state Failed('Flow run encountered an exception. ConnectError: [Errno -2] Name or service not known')
--------------------------------------------------------------------------- ConnectError Traceback (most recent call last) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py:69, in map_httpcore_exceptions() 68 try: ---> 69 yield 70 except Exception as exc: File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py:233, in HTTPTransport.handle_request(self, request) 232 with map_httpcore_exceptions(): --> 233 resp = self._pool.handle_request(req) 235 assert isinstance(resp.stream, typing.Iterable) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py:216, in ConnectionPool.handle_request(self, request) 215 self._close_connections(closing) --> 216 raise exc from None 218 # Return the response. Note that in this case we still have to manage 219 # the point at which the response is closed. File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection_pool.py:196, in ConnectionPool.handle_request(self, request) 194 try: 195 # Send the request on the assigned connection. --> 196 response = connection.handle_request( 197 pool_request.request 198 ) 199 except ConnectionNotAvailable: 200 # In some cases a connection may initially be available to 201 # handle a request, but then become unavailable. 202 # 203 # In this case we clear the connection and try again. File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py:99, in HTTPConnection.handle_request(self, request) 98 self._connect_failed = True ---> 99 raise exc 101 return self._connection.handle_request(request) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py:76, in HTTPConnection.handle_request(self, request) 75 if self._connection is None: ---> 76 stream = self._connect(request) 78 ssl_object = stream.get_extra_info("ssl_object") File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_sync/connection.py:122, in HTTPConnection._connect(self, request) 121 with Trace("connect_tcp", logger, request, kwargs) as trace: --> 122 stream = self._network_backend.connect_tcp(**kwargs) 123 trace.return_value = stream File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_backends/sync.py:205, in SyncBackend.connect_tcp(self, host, port, timeout, local_address, socket_options) 200 exc_map: ExceptionMapping = { 201 socket.timeout: ConnectTimeout, 202 OSError: ConnectError, 203 } --> 205 with map_exceptions(exc_map): 206 sock = socket.create_connection( 207 address, 208 timeout, 209 source_address=source_address, 210 ) File ~/mambaforge/envs/cfast/lib/python3.11/contextlib.py:155, in _GeneratorContextManager.__exit__(self, typ, value, traceback) 154 try: --> 155 self.gen.throw(typ, value, traceback) 156 except StopIteration as exc: 157 # Suppress StopIteration *unless* it's the same exception that 158 # was passed to throw(). This prevents a StopIteration 159 # raised inside the "with" statement from being suppressed. File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpcore/_exceptions.py:14, in map_exceptions(map) 13 if isinstance(exc, from_exc): ---> 14 raise to_exc(exc) from exc 15 raise ConnectError: [Errno -2] Name or service not known The above exception was the direct cause of the following exception: ConnectError Traceback (most recent call last) Cell In[24], line 24 20 @flow 21 def get_data_flow(): 22 get_data_task() ---> 24 get_data_flow() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:1224, in Flow.__call__(self, return_state, wait_for, *args, **kwargs) 1219 if task_viz_tracker: 1220 # this is a subflow, for now return a single task and do not go further 1221 # we can add support for exploring subflows for tasks in the future. 1222 return track_viz_task(self.isasync, self.name, parameters) -> 1224 return enter_flow_run_engine_from_flow_call( 1225 self, 1226 parameters, 1227 wait_for=wait_for, 1228 return_type=return_type, 1229 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:297, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type) 290 retval = from_async.wait_for_call_in_loop_thread( 291 begin_run, 292 done_callbacks=done_callbacks, 293 contexts=contexts, 294 ) 296 else: --> 297 retval = from_sync.wait_for_call_in_loop_thread( 298 begin_run, 299 done_callbacks=done_callbacks, 300 contexts=contexts, 301 ) 303 return retval File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts) 241 stack.enter_context(context) 242 waiter.wait() --> 243 return call.result() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout) 312 def result(self, timeout: Optional[float] = None) -> T: 313 """ 314 Wait for the result of the call. 315 316 Not safe for use from asynchronous contexts. 317 """ --> 318 return self.future.result(timeout=timeout) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout) 177 raise CancelledError() 178 elif self._state == FINISHED: --> 179 return self.__get_result() 181 self._condition.wait(timeout) 183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 184 # Raise Prefect cancelled error instead of 185 # `concurrent.futures._base.CancelledError` File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self) 399 if self._exception: 400 try: --> 401 raise self._exception 402 finally: 403 # Break a reference cycle with the exception in self._exception 404 self = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***) 387 with self.future.enforce_async_deadline() as cancel_scope: 388 try: --> 389 result = await coro 390 finally: 391 # Forget this call's arguments in order to free up any memory 392 # that may be referenced by them; after a call has happened, 393 # there's no need to keep a reference to them 394 self.args = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/client/utilities.py:78, in inject_client.<locals>.with_injected_client(*args, **kwargs) 76 async with context as new_client: 77 kwargs.setdefault("client", new_client or client) ---> 78 return await fn(*args, **kwargs) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:400, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client, user_thread) 398 return state 399 elif return_type == "result": --> 400 return await state.result(fetch=True) 401 else: 402 raise ValueError(f"Invalid return type for flow engine {return_type!r}.") File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure) 84 raise UnfinishedRun( 85 f"Run is in {state.type.name} state, its result is not available." 86 ) 88 if raise_on_failure and ( 89 state.is_crashed() or state.is_failed() or state.is_cancelled() 90 ): ---> 91 raise await get_state_exception(state) 93 if isinstance(state.data, DataDocument): 94 result = result_from_state_with_data_document( 95 state, raise_on_failure=raise_on_failure 96 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:867, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context, user_thread) 862 else: 863 from_async.call_soon_in_new_thread( 864 flow_call, timeout=flow.timeout_seconds 865 ) --> 867 result = await flow_call.aresult() 869 waited_for_task_runs = await wait_for_task_runs_and_report_crashes( 870 flow_run_context.task_run_futures, client=client 871 ) 872 except PausedRun as exc: 873 # could get raised either via utility or by returning Paused from a task run 874 # if a task run pauses, we set its state as the flow's state 875 # to preserve reschedule and timeout behavior File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:327, in Call.aresult(self) 321 """ 322 Wait for the result of the call. 323 324 For use from asynchronous contexts. 325 """ 326 try: --> 327 return await asyncio.wrap_future(self.future) 328 except asyncio.CancelledError as exc: 329 raise CancelledError() from exc File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:352, in Call._run_sync(***failed resolving arguments***) 350 with self.future.enforce_sync_deadline() as cancel_scope: 351 try: --> 352 result = self.fn(*self.args, **self.kwargs) 353 finally: 354 # Forget this call's arguments in order to free up any memory 355 # that may be referenced by them; after a call has happened, 356 # there's no need to keep a reference to them 357 self.args = None Cell In[24], line 22, in get_data_flow() 20 @flow 21 def get_data_flow(): ---> 22 get_data_task() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:600, in Task.__call__(self, return_state, wait_for, *args, **kwargs) 589 from prefect import get_client 591 return submit_autonomous_task_run_to_engine( 592 task=self, 593 task_run=None, (...) 597 client=get_client(), 598 ) --> 600 return enter_task_run_engine( 601 self, 602 parameters=parameters, 603 wait_for=wait_for, 604 task_runner=SequentialTaskRunner(), 605 return_type=return_type, 606 mapped=False, 607 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:1421, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped) 1419 return from_async.wait_for_call_in_loop_thread(begin_run) 1420 else: -> 1421 return from_sync.wait_for_call_in_loop_thread(begin_run) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts) 241 stack.enter_context(context) 242 waiter.wait() --> 243 return call.result() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout) 312 def result(self, timeout: Optional[float] = None) -> T: 313 """ 314 Wait for the result of the call. 315 316 Not safe for use from asynchronous contexts. 317 """ --> 318 return self.future.result(timeout=timeout) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout) 177 raise CancelledError() 178 elif self._state == FINISHED: --> 179 return self.__get_result() 181 self._condition.wait(timeout) 183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 184 # Raise Prefect cancelled error instead of 185 # `concurrent.futures._base.CancelledError` File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self) 399 if self._exception: 400 try: --> 401 raise self._exception 402 finally: 403 # Break a reference cycle with the exception in self._exception 404 self = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***) 387 with self.future.enforce_async_deadline() as cancel_scope: 388 try: --> 389 result = await coro 390 finally: 391 # Forget this call's arguments in order to free up any memory 392 # that may be referenced by them; after a call has happened, 393 # there's no need to keep a reference to them 394 self.args = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:1601, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs) 1599 return await future._wait() 1600 elif return_type == "result": -> 1601 return await future._result() 1602 else: 1603 raise ValueError(f"Invalid return type for task engine {return_type!r}.") File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/futures.py:237, in PrefectFuture._result(self, timeout, raise_on_failure) 235 if not final_state: 236 raise TimeoutError("Call timed out before task finished.") --> 237 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure) 84 raise UnfinishedRun( 85 f"Run is in {state.type.name} state, its result is not available." 86 ) 88 if raise_on_failure and ( 89 state.is_crashed() or state.is_failed() or state.is_cancelled() 90 ): ---> 91 raise await get_state_exception(state) 93 if isinstance(state.data, DataDocument): 94 result = result_from_state_with_data_document( 95 state, raise_on_failure=raise_on_failure 96 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:2147, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, log_prints, interruptible, client) 2140 logger.debug( 2141 "Beginning execution...", extra={"state_message": True} 2142 ) 2144 call = from_async.call_soon_in_new_thread( 2145 create_call(task.fn, *args, **kwargs), timeout=task.timeout_seconds 2146 ) -> 2147 result = await call.aresult() 2149 except (CancelledError, asyncio.CancelledError) as exc: 2150 if not call.timedout(): 2151 # If the task call was not cancelled by us; this is a crash File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:327, in Call.aresult(self) 321 """ 322 Wait for the result of the call. 323 324 For use from asynchronous contexts. 325 """ 326 try: --> 327 return await asyncio.wrap_future(self.future) 328 except asyncio.CancelledError as exc: 329 raise CancelledError() from exc File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:352, in Call._run_sync(***failed resolving arguments***) 350 with self.future.enforce_sync_deadline() as cancel_scope: 351 try: --> 352 result = self.fn(*self.args, **self.kwargs) 353 finally: 354 # Forget this call's arguments in order to free up any memory 355 # that may be referenced by them; after a call has happened, 356 # there's no need to keep a reference to them 357 self.args = None Cell In[24], line 10, in get_data_task(url) 6 @task(retries=2, retry_delay_seconds=5) 7 def get_data_task( 8 url: str = "https://api.brittle-service.com/endpoint" 9 ) -> dict: ---> 10 response = httpx.get(url) 12 # If the response status code is anything but a 2xx, httpx will raise 13 # an exception. This task doesn't handle the exception, so Prefect will 14 # catch the exception and will consider the task run failed. 15 response.raise_for_status() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py:198, in get(url, params, headers, cookies, auth, proxy, proxies, follow_redirects, cert, verify, timeout, trust_env) 175 def get( 176 url: URLTypes, 177 *, (...) 188 trust_env: bool = True, 189 ) -> Response: 190 """ 191 Sends a `GET` request. 192 (...) 196 on this function, as `GET` requests should not include a request body. 197 """ --> 198 return request( 199 "GET", 200 url, 201 params=params, 202 headers=headers, 203 cookies=cookies, 204 auth=auth, 205 proxy=proxy, 206 proxies=proxies, 207 follow_redirects=follow_redirects, 208 cert=cert, 209 verify=verify, 210 timeout=timeout, 211 trust_env=trust_env, 212 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_api.py:106, in request(method, url, params, content, data, files, json, headers, cookies, auth, proxy, proxies, timeout, follow_redirects, verify, cert, trust_env) 46 """ 47 Sends an HTTP request. 48 (...) 95 ``` 96 """ 97 with Client( 98 cookies=cookies, 99 proxy=proxy, (...) 104 trust_env=trust_env, 105 ) as client: --> 106 return client.request( 107 method=method, 108 url=url, 109 content=content, 110 data=data, 111 files=files, 112 json=json, 113 params=params, 114 headers=headers, 115 auth=auth, 116 follow_redirects=follow_redirects, 117 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:827, in Client.request(self, method, url, content, data, files, json, params, headers, cookies, auth, follow_redirects, timeout, extensions) 812 warnings.warn(message, DeprecationWarning) 814 request = self.build_request( 815 method=method, 816 url=url, (...) 825 extensions=extensions, 826 ) --> 827 return self.send(request, auth=auth, follow_redirects=follow_redirects) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:914, in Client.send(self, request, stream, auth, follow_redirects) 906 follow_redirects = ( 907 self.follow_redirects 908 if isinstance(follow_redirects, UseClientDefault) 909 else follow_redirects 910 ) 912 auth = self._build_request_auth(request, auth) --> 914 response = self._send_handling_auth( 915 request, 916 auth=auth, 917 follow_redirects=follow_redirects, 918 history=[], 919 ) 920 try: 921 if not stream: File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:942, in Client._send_handling_auth(self, request, auth, follow_redirects, history) 939 request = next(auth_flow) 941 while True: --> 942 response = self._send_handling_redirects( 943 request, 944 follow_redirects=follow_redirects, 945 history=history, 946 ) 947 try: 948 try: File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:979, in Client._send_handling_redirects(self, request, follow_redirects, history) 976 for hook in self._event_hooks["request"]: 977 hook(request) --> 979 response = self._send_single_request(request) 980 try: 981 for hook in self._event_hooks["response"]: File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_client.py:1015, in Client._send_single_request(self, request) 1010 raise RuntimeError( 1011 "Attempted to send an async request with a sync Client instance." 1012 ) 1014 with request_context(request=request): -> 1015 response = transport.handle_request(request) 1017 assert isinstance(response.stream, SyncByteStream) 1019 response.request = request File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py:232, in HTTPTransport.handle_request(self, request) 218 assert isinstance(request.stream, SyncByteStream) 220 req = httpcore.Request( 221 method=request.method, 222 url=httpcore.URL( (...) 230 extensions=request.extensions, 231 ) --> 232 with map_httpcore_exceptions(): 233 resp = self._pool.handle_request(req) 235 assert isinstance(resp.stream, typing.Iterable) File ~/mambaforge/envs/cfast/lib/python3.11/contextlib.py:155, in _GeneratorContextManager.__exit__(self, typ, value, traceback) 153 value = typ() 154 try: --> 155 self.gen.throw(typ, value, traceback) 156 except StopIteration as exc: 157 # Suppress StopIteration *unless* it's the same exception that 158 # was passed to throw(). This prevents a StopIteration 159 # raised inside the "with" statement from being suppressed. 160 return exc is not value File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_transports/default.py:86, in map_httpcore_exceptions() 83 raise 85 message = str(exc) ---> 86 raise mapped_exc(message) from exc ConnectError: [Errno -2] Name or service not known
import httpx
from prefect import flow, task
def retry_handler(task, task_run, state) -> bool:
"""This is a custom retry handler to handle when we want to retry a task"""
try:
# Attempt to get the result of the task
state.result()except httpx.HTTPStatusError as exc:
# Retry on any HTTP status code that is not 401 or 404
= [401, 404]
do_not_retry_on_these_codes return exc.response.status_code not in do_not_retry_on_these_codes
except httpx.ConnectError:
# Do not retry
return False
except:
# For any other exception, retry
return True
@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
= httpx.get(url)
response
response.raise_for_status()return response.json()
@flow
def get_data_flow(url):
=url)
my_api_call_task(url
if __name__ == "__main__":
="https://httpbin.org/status/503") get_data_flow(url
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'get-data-flow' and defined at '/tmp/ipykernel_23433/2972459076.py:26' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
19:16:36.208 | INFO | prefect.engine - Created flow run 'brilliant-bustard' for flow 'get-data-flow'
19:16:36.271 | INFO | Flow run 'brilliant-bustard' - Created task run 'my_api_call_task-0' for task 'my_api_call_task'
19:16:36.273 | INFO | Flow run 'brilliant-bustard' - Executing 'my_api_call_task-0' immediately...
19:16:37.330 | ERROR | Task run 'my_api_call_task-0' - Encountered exception during execution: Traceback (most recent call last): File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run result = await call.aresult() ^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync result = self.fn(*self.args, **self.kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/tmp/ipykernel_23433/2972459076.py", line 23, in my_api_call_task response.raise_for_status() File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_models.py", line 761, in raise_for_status raise HTTPStatusError(message, request=request, response=self) httpx.HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503' For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503
19:16:37.360 | INFO | Task run 'my_api_call_task-0' - Received non-final state 'AwaitingRetry' when proposing final state 'Failed' and will attempt to run again...
19:16:38.386 | ERROR | Task run 'my_api_call_task-0' - Encountered exception during execution: Traceback (most recent call last): File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run result = await call.aresult() ^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync result = self.fn(*self.args, **self.kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/tmp/ipykernel_23433/2972459076.py", line 23, in my_api_call_task response.raise_for_status() File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_models.py", line 761, in raise_for_status raise HTTPStatusError(message, request=request, response=self) httpx.HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503' For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503
19:16:38.415 | ERROR | Task run 'my_api_call_task-0' - Finished in state Failed("Task run encountered an exception HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'\nFor more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503")
19:16:38.418 | ERROR | Flow run 'brilliant-bustard' - Encountered exception during execution: Traceback (most recent call last): File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 867, in orchestrate_flow_run result = await flow_call.aresult() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync result = self.fn(*self.args, **self.kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/tmp/ipykernel_23433/2972459076.py", line 28, in get_data_flow my_api_call_task(url=url) File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py", line 600, in __call__ return enter_task_run_engine( ^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 1421, in enter_task_run_engine return from_sync.wait_for_call_in_loop_thread(begin_run) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread return call.result() ^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result return self.future.result(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro ^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 1601, in get_task_call_return_value return await future._result() ^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result return await final_state.result(raise_on_failure=raise_on_failure, fetch=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/tmp/ipykernel_23433/2972459076.py", line 8, in retry_handler state.result() File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/client/schemas/objects.py", line 224, in result return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py", line 71, in get_state_result return _get_state_result(state, raise_on_failure=raise_on_failure) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/utilities/asyncutils.py", line 259, in coroutine_wrapper return call() ^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 432, in __call__ return self.result() ^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 318, in result return self.future.result(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 179, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 389, in _run_async result = await coro ^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run result = await call.aresult() ^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult return await asyncio.wrap_future(self.future) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync result = self.fn(*self.args, **self.kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/tmp/ipykernel_23433/2972459076.py", line 23, in my_api_call_task response.raise_for_status() File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_models.py", line 761, in raise_for_status raise HTTPStatusError(message, request=request, response=self) httpx.HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503' For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503
19:16:38.449 | ERROR | Flow run 'brilliant-bustard' - Finished in state Failed("Flow run encountered an exception. HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503'\nFor more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503")
--------------------------------------------------------------------------- HTTPStatusError Traceback (most recent call last) Cell In[26], line 31 28 my_api_call_task(url=url) 30 if __name__ == "__main__": ---> 31 get_data_flow(url="https://httpbin.org/status/503") File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:1224, in Flow.__call__(self, return_state, wait_for, *args, **kwargs) 1219 if task_viz_tracker: 1220 # this is a subflow, for now return a single task and do not go further 1221 # we can add support for exploring subflows for tasks in the future. 1222 return track_viz_task(self.isasync, self.name, parameters) -> 1224 return enter_flow_run_engine_from_flow_call( 1225 self, 1226 parameters, 1227 wait_for=wait_for, 1228 return_type=return_type, 1229 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:297, in enter_flow_run_engine_from_flow_call(flow, parameters, wait_for, return_type) 290 retval = from_async.wait_for_call_in_loop_thread( 291 begin_run, 292 done_callbacks=done_callbacks, 293 contexts=contexts, 294 ) 296 else: --> 297 retval = from_sync.wait_for_call_in_loop_thread( 298 begin_run, 299 done_callbacks=done_callbacks, 300 contexts=contexts, 301 ) 303 return retval File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts) 241 stack.enter_context(context) 242 waiter.wait() --> 243 return call.result() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout) 312 def result(self, timeout: Optional[float] = None) -> T: 313 """ 314 Wait for the result of the call. 315 316 Not safe for use from asynchronous contexts. 317 """ --> 318 return self.future.result(timeout=timeout) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout) 177 raise CancelledError() 178 elif self._state == FINISHED: --> 179 return self.__get_result() 181 self._condition.wait(timeout) 183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 184 # Raise Prefect cancelled error instead of 185 # `concurrent.futures._base.CancelledError` File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self) 399 if self._exception: 400 try: --> 401 raise self._exception 402 finally: 403 # Break a reference cycle with the exception in self._exception 404 self = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***) 387 with self.future.enforce_async_deadline() as cancel_scope: 388 try: --> 389 result = await coro 390 finally: 391 # Forget this call's arguments in order to free up any memory 392 # that may be referenced by them; after a call has happened, 393 # there's no need to keep a reference to them 394 self.args = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/client/utilities.py:78, in inject_client.<locals>.with_injected_client(*args, **kwargs) 76 async with context as new_client: 77 kwargs.setdefault("client", new_client or client) ---> 78 return await fn(*args, **kwargs) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:400, in create_then_begin_flow_run(flow, parameters, wait_for, return_type, client, user_thread) 398 return state 399 elif return_type == "result": --> 400 return await state.result(fetch=True) 401 else: 402 raise ValueError(f"Invalid return type for flow engine {return_type!r}.") File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure) 84 raise UnfinishedRun( 85 f"Run is in {state.type.name} state, its result is not available." 86 ) 88 if raise_on_failure and ( 89 state.is_crashed() or state.is_failed() or state.is_cancelled() 90 ): ---> 91 raise await get_state_exception(state) 93 if isinstance(state.data, DataDocument): 94 result = result_from_state_with_data_document( 95 state, raise_on_failure=raise_on_failure 96 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:867, in orchestrate_flow_run(flow, flow_run, parameters, wait_for, interruptible, client, partial_flow_run_context, user_thread) 862 else: 863 from_async.call_soon_in_new_thread( 864 flow_call, timeout=flow.timeout_seconds 865 ) --> 867 result = await flow_call.aresult() 869 waited_for_task_runs = await wait_for_task_runs_and_report_crashes( 870 flow_run_context.task_run_futures, client=client 871 ) 872 except PausedRun as exc: 873 # could get raised either via utility or by returning Paused from a task run 874 # if a task run pauses, we set its state as the flow's state 875 # to preserve reschedule and timeout behavior File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:327, in Call.aresult(self) 321 """ 322 Wait for the result of the call. 323 324 For use from asynchronous contexts. 325 """ 326 try: --> 327 return await asyncio.wrap_future(self.future) 328 except asyncio.CancelledError as exc: 329 raise CancelledError() from exc File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:352, in Call._run_sync(***failed resolving arguments***) 350 with self.future.enforce_sync_deadline() as cancel_scope: 351 try: --> 352 result = self.fn(*self.args, **self.kwargs) 353 finally: 354 # Forget this call's arguments in order to free up any memory 355 # that may be referenced by them; after a call has happened, 356 # there's no need to keep a reference to them 357 self.args = None Cell In[26], line 28, in get_data_flow(url) 26 @flow 27 def get_data_flow(url): ---> 28 my_api_call_task(url=url) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:600, in Task.__call__(self, return_state, wait_for, *args, **kwargs) 589 from prefect import get_client 591 return submit_autonomous_task_run_to_engine( 592 task=self, 593 task_run=None, (...) 597 client=get_client(), 598 ) --> 600 return enter_task_run_engine( 601 self, 602 parameters=parameters, 603 wait_for=wait_for, 604 task_runner=SequentialTaskRunner(), 605 return_type=return_type, 606 mapped=False, 607 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:1421, in enter_task_run_engine(task, parameters, wait_for, return_type, task_runner, mapped) 1419 return from_async.wait_for_call_in_loop_thread(begin_run) 1420 else: -> 1421 return from_sync.wait_for_call_in_loop_thread(begin_run) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py:243, in from_sync.wait_for_call_in_loop_thread(_from_sync__call, timeout, done_callbacks, contexts) 241 stack.enter_context(context) 242 waiter.wait() --> 243 return call.result() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout) 312 def result(self, timeout: Optional[float] = None) -> T: 313 """ 314 Wait for the result of the call. 315 316 Not safe for use from asynchronous contexts. 317 """ --> 318 return self.future.result(timeout=timeout) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout) 177 raise CancelledError() 178 elif self._state == FINISHED: --> 179 return self.__get_result() 181 self._condition.wait(timeout) 183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 184 # Raise Prefect cancelled error instead of 185 # `concurrent.futures._base.CancelledError` File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self) 399 if self._exception: 400 try: --> 401 raise self._exception 402 finally: 403 # Break a reference cycle with the exception in self._exception 404 self = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***) 387 with self.future.enforce_async_deadline() as cancel_scope: 388 try: --> 389 result = await coro 390 finally: 391 # Forget this call's arguments in order to free up any memory 392 # that may be referenced by them; after a call has happened, 393 # there's no need to keep a reference to them 394 self.args = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:1601, in get_task_call_return_value(task, flow_run_context, parameters, wait_for, return_type, task_runner, extra_task_inputs) 1599 return await future._wait() 1600 elif return_type == "result": -> 1601 return await future._result() 1602 else: 1603 raise ValueError(f"Invalid return type for task engine {return_type!r}.") File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/futures.py:237, in PrefectFuture._result(self, timeout, raise_on_failure) 235 if not final_state: 236 raise TimeoutError("Call timed out before task finished.") --> 237 return await final_state.result(raise_on_failure=raise_on_failure, fetch=True) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure) 84 raise UnfinishedRun( 85 f"Run is in {state.type.name} state, its result is not available." 86 ) 88 if raise_on_failure and ( 89 state.is_crashed() or state.is_failed() or state.is_cancelled() 90 ): ---> 91 raise await get_state_exception(state) 93 if isinstance(state.data, DataDocument): 94 result = result_from_state_with_data_document( 95 state, raise_on_failure=raise_on_failure 96 ) Cell In[26], line 8, in retry_handler(task, task_run, state) 5 """This is a custom retry handler to handle when we want to retry a task""" 6 try: 7 # Attempt to get the result of the task ----> 8 state.result() 9 except httpx.HTTPStatusError as exc: 10 # Retry on any HTTP status code that is not 401 or 404 11 do_not_retry_on_these_codes = [401, 404] File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/client/schemas/objects.py:224, in State.result(self, raise_on_failure, fetch) 152 """ 153 Retrieve the result attached to this state. 154 (...) 220 hello 221 """ 222 from prefect.states import get_state_result --> 224 return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:71, in get_state_result(state, raise_on_failure, fetch) 69 return state.data 70 else: ---> 71 return _get_state_result(state, raise_on_failure=raise_on_failure) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/utilities/asyncutils.py:259, in sync_compatible.<locals>.coroutine_wrapper(*args, **kwargs) 257 # Run in a new event loop, but use a `Call` for nested context detection 258 call = create_call(async_fn, *args, **kwargs) --> 259 return call() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:432, in Call.__call__(self) 430 return run_and_return_result() 431 else: --> 432 return self.result() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:318, in Call.result(self, timeout) 312 def result(self, timeout: Optional[float] = None) -> T: 313 """ 314 Wait for the result of the call. 315 316 Not safe for use from asynchronous contexts. 317 """ --> 318 return self.future.result(timeout=timeout) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:179, in Future.result(self, timeout) 177 raise CancelledError() 178 elif self._state == FINISHED: --> 179 return self.__get_result() 181 self._condition.wait(timeout) 183 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 184 # Raise Prefect cancelled error instead of 185 # `concurrent.futures._base.CancelledError` File ~/mambaforge/envs/cfast/lib/python3.11/concurrent/futures/_base.py:401, in Future.__get_result(self) 399 if self._exception: 400 try: --> 401 raise self._exception 402 finally: 403 # Break a reference cycle with the exception in self._exception 404 self = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:389, in Call._run_async(***failed resolving arguments***) 387 with self.future.enforce_async_deadline() as cancel_scope: 388 try: --> 389 result = await coro 390 finally: 391 # Forget this call's arguments in order to free up any memory 392 # that may be referenced by them; after a call has happened, 393 # there's no need to keep a reference to them 394 self.args = None File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/states.py:91, in _get_state_result(state, raise_on_failure) 84 raise UnfinishedRun( 85 f"Run is in {state.type.name} state, its result is not available." 86 ) 88 if raise_on_failure and ( 89 state.is_crashed() or state.is_failed() or state.is_cancelled() 90 ): ---> 91 raise await get_state_exception(state) 93 if isinstance(state.data, DataDocument): 94 result = result_from_state_with_data_document( 95 state, raise_on_failure=raise_on_failure 96 ) File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py:2147, in orchestrate_task_run(task, task_run, parameters, wait_for, result_factory, log_prints, interruptible, client) 2140 logger.debug( 2141 "Beginning execution...", extra={"state_message": True} 2142 ) 2144 call = from_async.call_soon_in_new_thread( 2145 create_call(task.fn, *args, **kwargs), timeout=task.timeout_seconds 2146 ) -> 2147 result = await call.aresult() 2149 except (CancelledError, asyncio.CancelledError) as exc: 2150 if not call.timedout(): 2151 # If the task call was not cancelled by us; this is a crash File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:327, in Call.aresult(self) 321 """ 322 Wait for the result of the call. 323 324 For use from asynchronous contexts. 325 """ 326 try: --> 327 return await asyncio.wrap_future(self.future) 328 except asyncio.CancelledError as exc: 329 raise CancelledError() from exc File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py:352, in Call._run_sync(***failed resolving arguments***) 350 with self.future.enforce_sync_deadline() as cancel_scope: 351 try: --> 352 result = self.fn(*self.args, **self.kwargs) 353 finally: 354 # Forget this call's arguments in order to free up any memory 355 # that may be referenced by them; after a call has happened, 356 # there's no need to keep a reference to them 357 self.args = None Cell In[26], line 23, in my_api_call_task(url) 20 @task(retries=1, retry_condition_fn=retry_handler) 21 def my_api_call_task(url): 22 response = httpx.get(url) ---> 23 response.raise_for_status() 24 return response.json() File ~/mambaforge/envs/cfast/lib/python3.11/site-packages/httpx/_models.py:761, in Response.raise_for_status(self) 759 error_type = error_types.get(status_class, "Invalid status code") 760 message = message.format(self, error_type=error_type) --> 761 raise HTTPStatusError(message, request=request, response=self) HTTPStatusError: Server error '503 SERVICE UNAVAILABLE' for url 'https://httpbin.org/status/503' For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/503
Timeouts
from prefect import task, get_run_logger
import time
@task(timeout_seconds=1)
def show_timeouts():
= get_run_logger()
logger "I will execute")
logger.info(5)
time.sleep("I will not execute") logger.info(
Wait for
@task
def task_1():
pass
@task
def task_2():
pass
@flow
def my_flow():
= task_1()
x
# task 2 will wait for task_1 to complete
= task_2(wait_for=[x]) y
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_23433/1697639020.py:9' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
Maps
from prefect import flow, task
@task
def print_nums(nums):
for n in nums:
print(n)
@task
def square_num(num):
return num**2
@flow
def map_flow(nums):
print_nums(nums)= square_num.map(nums)
squared_nums
print_nums(squared_nums)
1,2,3,5,8,13]) map_flow([
19:23:30.843 | INFO | prefect.engine - Created flow run 'stereotyped-wolf' for flow 'map-flow'
19:23:30.904 | INFO | Flow run 'stereotyped-wolf' - Created task run 'print_nums-0' for task 'print_nums'
19:23:30.906 | INFO | Flow run 'stereotyped-wolf' - Executing 'print_nums-0' immediately...
1
2
3
5
8
13
19:23:30.981 | INFO | Task run 'print_nums-0' - Finished in state Completed()
19:23:31.067 | INFO | Flow run 'stereotyped-wolf' - Created task run 'square_num-3' for task 'square_num'
19:23:31.069 | INFO | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-3' for execution.
19:23:31.085 | INFO | Flow run 'stereotyped-wolf' - Created task run 'square_num-2' for task 'square_num'
19:23:31.087 | INFO | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-2' for execution.
19:23:31.105 | INFO | Flow run 'stereotyped-wolf' - Created task run 'square_num-4' for task 'square_num'
19:23:31.106 | INFO | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-4' for execution.
19:23:31.176 | INFO | Flow run 'stereotyped-wolf' - Created task run 'square_num-1' for task 'square_num'
19:23:31.179 | INFO | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-1' for execution.
19:23:31.216 | INFO | Task run 'square_num-3' - Finished in state Completed()
19:23:31.233 | INFO | Task run 'square_num-4' - Finished in state Completed()
19:23:31.244 | INFO | Flow run 'stereotyped-wolf' - Created task run 'square_num-5' for task 'square_num'
19:23:31.246 | INFO | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-5' for execution.
19:23:31.331 | INFO | Task run 'square_num-5' - Finished in state Completed()
19:23:31.352 | INFO | Task run 'square_num-1' - Finished in state Completed()
19:23:31.396 | INFO | Flow run 'stereotyped-wolf' - Created task run 'square_num-0' for task 'square_num'
19:23:31.399 | INFO | Flow run 'stereotyped-wolf' - Submitted task run 'square_num-0' for execution.
19:23:31.452 | INFO | Flow run 'stereotyped-wolf' - Created task run 'print_nums-1' for task 'print_nums'
19:23:31.453 | INFO | Flow run 'stereotyped-wolf' - Executing 'print_nums-1' immediately...
19:23:31.477 | INFO | Task run 'square_num-0' - Finished in state Completed()
19:23:31.615 | INFO | Task run 'square_num-2' - Finished in state Completed()
1
4
9
25
64
169
19:23:31.681 | INFO | Task run 'print_nums-1' - Finished in state Completed()
19:23:31.710 | INFO | Flow run 'stereotyped-wolf' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `int`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]
Async
import asyncio
from prefect import task, flow
@task
async def print_values(values):
for value in values:
await asyncio.sleep(1) # yield
print(value, end=" ")
@flow
async def async_flow():
await print_values([1, 2]) # runs immediately
= [print_values("abcd"), print_values("6789")]
coros
# asynchronously gather the tasks
await asyncio.gather(*coros)
asyncio.run(async_flow())
from prefect import get_client
async with get_client() as client:
# set a concurrency limit of 10 on the 'small_instance' tag
= await client.create_concurrency_limit(
limit_id ="small_instance",
tag=10
concurrency_limit )
!prefect concurrency-limit inspect small_instance
╭──────────────────────────────────────────────────────────────────────────╮
│ Concurrency Limit ID: 879f2e40-8387-47c5-af34-0b164f7ea8bc │
│ ┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓ │
│ ┃ Tag ┃ Concurrency Limit ┃ Created ┃ Updated ┃ │
│ ┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩ │
│ │ small_instance │ 10 │ '1 minute ago' │ '1 minute ago' │ │
│ └────────────────┴───────────────────┴────────────────┴────────────────┘ │
│ ┏━━━━━━━━━━━━━━━━━━━━━┓ │
│ ┃ Active Task Run IDs ┃ │
│ ┡━━━━━━━━━━━━━━━━━━━━━┩ │
│ └─────────────────────┘ │
╰──────────────────────────────────────────────────────────────────────────╯
Deployments
name: prefect.yaml
# Welcome to your prefect.yaml file! You can use this file for storing and managing
# configuration for deploying your flows. We recommend committing this file to source
# control along with your flow code.
# Generic metadata about this project
name: nbs
prefect-version: 2.16.8
# build section allows you to manage and build docker images
build:
# push section allows you to manage if and how this project is uploaded to remote locations
push:
# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.git_clone:
repository: git@github.com:bthek1/MLtools.git
branch: main
# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: slow_flow
version:
tags: []
description: Sleepy flow - sleeps the provided amount of time (in seconds).
entrypoint: nbs/prefect_deployment_serve.py:slow_flow
parameters: {}
work_pool:
name: test-pool
work_queue_name:
job_variables: {}
schedules:
- interval: 30.0
anchor_date: '2024-04-03T13:38:23.549390+00:00'
timezone: UTC
active: true
- name: fast_flow
version:
tags: []
description: Fastest flow this side of the Mississippi.
entrypoint: nbs/prefect_deployment_serve.py:fast_flow
parameters: {}
work_pool:
name: test-pool
work_queue_name:
job_variables: {}
schedules:
- interval: 60.0
anchor_date: '2024-04-03T14:13:30.384393+00:00'
timezone: UTC
active: true
- interval: 150.0
anchor_date: '2024-04-03T14:13:45.806620+00:00'
timezone: UTC
active: false
Work Pools : To do
Work pool overview¶
Work pools organize work for execution. Work pools have types corresponding to the infrastructure that will execute the flow code, as well as the delivery method of work to that environment. Pull work pools require workers (or less ideally, agents) to poll the work pool for flow runs to execute. Push work pools can submit runs directly to your serverless infrastructure providers such as Google Cloud Run, Azure Container Instances, and AWS ECS without the need for an agent or worker.
prefect work-pool create test-pool
!prefect work-pool ls
Work Pools
┏━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━┓
┃ Name ┃ Type ┃ ID ┃ Concurrency Lim… ┃
┡━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━┩
│ test-pool │ proce… │ 5d41c025-ec45-4e64-9ef6-3ca91b9684e0 │ None │
└───────────┴────────┴──────────────────────────────────────┴──────────────────┘
(**) denotes a paused pool
!prefect work-pool inspect 'test-pool'
WorkPool(
id='5d41c025-ec45-4e64-9ef6-3ca91b9684e0',
created=DateTime(2024, 4, 3, 10, 12, 27, 298785, tzinfo=Timezone('+00:00')),
updated=DateTime(2024, 4, 3, 10, 12, 27, 306000, tzinfo=Timezone('+00:00')),
name='test-pool',
type='process',
base_job_template={
'job_configuration': {
'command': '{{ command }}',
'env': '{{ env }}',
'labels': '{{ labels }}',
'name': '{{ name }}',
'stream_output': '{{ stream_output }}',
'working_dir': '{{ working_dir }}'
},
'variables': {
'type': 'object',
'properties': {
'name': {
'title': 'Name',
'description': 'Name given to infrastructure created by a
worker.',
'type': 'string'
},
'env': {
'title': 'Environment Variables',
'description': 'Environment variables to set when starting a
flow run.',
'type': 'object',
'additionalProperties': {'type': 'string'}
},
'labels': {
'title': 'Labels',
'description': 'Labels applied to infrastructure created by
a worker.',
'type': 'object',
'additionalProperties': {'type': 'string'}
},
'command': {
'title': 'Command',
'description': 'The command to use when starting a flow run.
In most cases, this should be left blank and the command will be automatically
generated by the worker.',
'type': 'string'
},
'stream_output': {
'title': 'Stream Output',
'description': 'If enabled, workers will stream output from
flow run processes to local standard output.',
'default': True,
'type': 'boolean'
},
'working_dir': {
'title': 'Working Directory',
'description': 'If provided, workers will open flow run
processes within the specified path as the working directory. Otherwise, a
temporary directory will be created.',
'type': 'string',
'format': 'path'
}
}
}
},
status=WorkPoolStatus.NOT_READY,
default_queue_id='12cd8c55-db98-4e1c-baf6-ffffcfbb613f'
)
Schedules : To do
Prefect supports several types of schedules that cover a wide range of use cases and offer a large degree of customization:
Cron
is most appropriate for users who are already familiar withcron
from previous use.Interval
is best suited for deployments that need to run at some consistent cadence that isn’t related to absolute time.RRule
is best suited for deployments that rely on calendar logic for simple recurring schedules, irregular intervals, exclusions, or day-of-month adjustments.
!!! tip “Schedules can be inactive” When you create or edit a schedule, you can set the active
property to False
in Python (or false
in a YAML file) to deactivate the schedule. This is useful if you want to keep the schedule configuration but temporarily stop the schedule from creating new flow runs.
Results
from prefect import flow, task
@task
def my_task():
return 1
@flow
def my_flow():
= my_task.submit()
future return future.result() + 1
= my_flow()
result assert result == 2
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_task' and defined at '/tmp/ipykernel_44036/1506838836.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/1506838836.py:7' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
21:13:16.692 | INFO | prefect.engine - Created flow run 'ultra-capuchin' for flow 'my-flow'
21:13:16.756 | INFO | Flow run 'ultra-capuchin' - Created task run 'my_task-0' for task 'my_task'
21:13:16.758 | INFO | Flow run 'ultra-capuchin' - Submitted task run 'my_task-0' for execution.
21:13:16.833 | INFO | Task run 'my_task-0' - Finished in state Completed()
21:13:16.862 | INFO | Flow run 'ultra-capuchin' - Finished in state Completed()
Error handling
from prefect import flow, task
@task
def my_task():
raise ValueError()
@flow
def my_flow():
= my_task(return_state=True)
state
if state.is_failed():
print("Oh no! The task failed. Falling back to '1'.")
= 1
result else:
= state.result()
result
return result + 1
= my_flow()
result assert result == 2
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_task' and defined at '/tmp/ipykernel_44036/2193582311.py:3' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/2193582311.py:7' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
21:24:04.344 | INFO | prefect.engine - Created flow run 'active-flamingo' for flow 'my-flow'
21:24:04.406 | INFO | Flow run 'active-flamingo' - Created task run 'my_task-0' for task 'my_task'
21:24:04.407 | INFO | Flow run 'active-flamingo' - Executing 'my_task-0' immediately...
21:24:04.454 | ERROR | Task run 'my_task-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/engine.py", line 2147, in orchestrate_task_run
result = await call.aresult()
^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
return await asyncio.wrap_future(self.future)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/tmp/ipykernel_44036/2193582311.py", line 5, in my_task
raise ValueError()
ValueError
21:24:04.485 | ERROR | Task run 'my_task-0' - Finished in state Failed('Task run encountered an exception ValueError: ')
Oh no! The task failed. Falling back to '1'.
21:24:04.516 | INFO | Flow run 'active-flamingo' - Finished in state Completed()
Artifacts
from prefect import flow, task
from prefect.artifacts import create_link_artifact
@task
def my_first_task():
create_link_artifact(="create-link-artifact",
key="my_first_task",
link="## my_first_task",
description
)
@task
def my_second_task():
create_link_artifact(="create-link-artifact",
key="my_second_task",
link="## my_second_task",
description
)
@flow
def my_flow():
create_link_artifact(="create-link-artifact",
key="my_flow",
link="## my_flow",
description
)
my_first_task()
my_second_task()
if __name__ == "__main__":
my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_first_task' and defined at '/tmp/ipykernel_44036/2726054530.py:4' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/tasks.py:348: UserWarning: A task named 'my_second_task' and defined at '/tmp/ipykernel_44036/2726054530.py:12' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:
`@task(name='my_unique_name', ...)`
warnings.warn(
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/2726054530.py:20' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
21:31:10.608 | INFO | prefect.engine - Created flow run 'important-limpet' for flow 'my-flow'
21:31:10.688 | INFO | Flow run 'important-limpet' - Created task run 'my_first_task-0' for task 'my_first_task'
21:31:10.690 | INFO | Flow run 'important-limpet' - Executing 'my_first_task-0' immediately...
21:31:10.782 | INFO | Task run 'my_first_task-0' - Finished in state Completed()
21:31:10.803 | INFO | Flow run 'important-limpet' - Created task run 'my_second_task-0' for task 'my_second_task'
21:31:10.805 | INFO | Flow run 'important-limpet' - Executing 'my_second_task-0' immediately...
21:31:10.903 | INFO | Task run 'my_second_task-0' - Finished in state Completed()
21:31:10.932 | INFO | Flow run 'important-limpet' - Finished in state Completed('All states completed.')
from prefect import flow
from prefect.artifacts import create_link_artifact
@flow
def my_flow():
create_link_artifact(="my-important-link",
key="https://www.prefect.io/",
link="Prefect",
link_text
)
if __name__ == "__main__":
my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/1748943510.py:4' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
21:57:21.601 | INFO | prefect.engine - Created flow run 'tomato-eagle' for flow 'my-flow'
21:57:21.688 | INFO | Flow run 'tomato-eagle' - Finished in state Completed()
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact
@task
def markdown_task():
= 500000
na_revenue = f"""# Sales Report
markdown_report
## Summary
In the past quarter, our company saw a significant increase in sales, with a total revenue of $1,000,000.
This represents a 20% increase over the same period last year.
## Sales by Region
| Region | Revenue |
|:--------------|-------:|
| North America | ${na_revenue:,} |
| Europe | $250,000 |
| Asia | $150,000 |
| South America | $75,000 |
| Africa | $25,000 |
## Top Products
1. Product A - $300,000 in revenue
2. Product B - $200,000 in revenue
3. Product C - $150,000 in revenue
## Conclusion
Overall, these results are very encouraging and demonstrate the success of our sales team in increasing revenue
across all regions. However, we still have room for improvement and should focus on further increasing sales in
the coming quarter.
"""
create_markdown_artifact(="gtm-report",
key=markdown_report,
markdown="Quarterly Sales Report",
description
)
@flow()
def my_flow():
markdown_task()
if __name__ == "__main__":
my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/3501414838.py:42' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
21:58:42.196 | INFO | prefect.engine - Created flow run 'sociable-jaguarundi' for flow 'my-flow'
21:58:42.266 | INFO | Flow run 'sociable-jaguarundi' - Created task run 'markdown_task-0' for task 'markdown_task'
21:58:42.267 | INFO | Flow run 'sociable-jaguarundi' - Executing 'markdown_task-0' immediately...
21:58:42.360 | INFO | Task run 'markdown_task-0' - Finished in state Completed()
21:58:42.392 | INFO | Flow run 'sociable-jaguarundi' - Finished in state Completed('All states completed.')
States
When calling a task or a flow, there are three types of returned values:
- Data: A Python object (such as
int
,str
,dict
,list
, and so on). State
: A Prefect object indicating the state of a flow or task run.PrefectFuture
: A Prefect object that contains both data and State.
Returning data is the default behavior any time you call your_task()
.
Returning Prefect State
occurs anytime you call your task or flow with the argument return_state=True
.
Returning PrefectFuture
is achieved by calling your_task.submit()
.
from prefect import flow
def my_success_hook(flow, flow_run, state):
print("Flow run succeeded!")
@flow(on_completion=[my_success_hook])
def my_flow():
return 42
my_flow()
/home/ben/mambaforge/envs/cfast/lib/python3.11/site-packages/prefect/flows.py:357: UserWarning: A flow named 'my-flow' and defined at '/tmp/ipykernel_44036/2923164121.py:6' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:
`@flow(name='my_unique_name', ...)`
warnings.warn(
22:27:55.866 | INFO | prefect.engine - Created flow run 'lime-bullmastiff' for flow 'my-flow'
22:27:55.935 | INFO | Flow run 'lime-bullmastiff' - Running hook 'my_success_hook' in response to entering state 'Completed'
Flow run succeeded!
22:27:55.939 | INFO | Flow run 'lime-bullmastiff' - Hook 'my_success_hook' finished running successfully
22:27:55.941 | INFO | Flow run 'lime-bullmastiff' - Finished in state Completed()
42
Create flow run state change hooks¶
def my_flow_hook(flow: Flow, flow_run: FlowRun, state: State):
"""This is the required signature for a flow run state
change hook. This hook can only be passed into flows.
"""
# pass hook as a list of callables
@flow(on_completion=[my_flow_hook])
Create task run state change hooks¶
def my_task_hook(task: Task, task_run: TaskRun, state: State):
"""This is the required signature for a task run state change
hook. This hook can only be passed into tasks.
"""
# pass hook as a list of callables
@task(on_failure=[my_task_hook])
Serve
from prefect import flow
@flow(log_prints=True)
def hello_world(name: str = "world", goodbye: bool = False):
print(f"Hello {name} from Prefect! 🤗")
if goodbye:
print(f"Goodbye {name}!")
if __name__ == "__main__":
# creates a deployment and stays running to monitor for work instructions generated on the server
="my-first-deployment",
hello_world.serve(name=["onboarding"],
tags={"goodbye": True},
parameters=60)
interval
import time
from prefect import flow, serve
@flow
def slow_flow(sleep: int = 60):
"Sleepy flow - sleeps the provided amount of time (in seconds)."
time.sleep(sleep)
@flow
def fast_flow():
"Fastest flow this side of the Mississippi."
return
if __name__ == "__main__":
= slow_flow.to_deployment(name="sleeper", interval=45)
slow_deploy = fast_flow.to_deployment(name="fast")
fast_deploy serve(slow_deploy, fast_deploy)