Objects for creating and configuring deployments for flows using serve functionality.
Example
importtimefromprefectimportflow,serve@flowdefslow_flow(sleep:int=60):"Sleepy flow - sleeps the provided amount of time (in seconds)."time.sleep(sleep)@flowdeffast_flow():"Fastest flow this side of the Mississippi."returnif__name__=="__main__":# to_deployment creates RunnerDeployment instancesslow_deploy=slow_flow.to_deployment(name="sleeper",interval=45)fast_deploy=fast_flow.to_deployment(name="fast")serve(slow_deploy,fast_deploy)
classDeploymentImage:""" Configuration used to build and push a Docker image for a deployment. Attributes: name: The name of the Docker image to build, including the registry and repository. tag: The tag to apply to the built image. dockerfile: The path to the Dockerfile to use for building the image. If not provided, a default Dockerfile will be generated. **build_kwargs: Additional keyword arguments to pass to the Docker build request. See the [`docker-py` documentation](https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build) for more information. """def__init__(self,name,tag=None,dockerfile="auto",**build_kwargs):image_name,image_tag=parse_image_tag(name)iftagandimage_tag:raiseValueError(f"Only one tag can be provided - both {image_tag!r} and {tag!r} were"" provided as tags.")namespace,repository=split_repository_path(image_name)# if the provided image name does not include a namespace (registry URL or user/org name),# use the default namespaceifnotnamespace:namespace=PREFECT_DEFAULT_DOCKER_BUILD_NAMESPACE.value()# join the namespace and repository to create the full image name# ignore namespace if it is Noneself.name="/".join(filter(None,[namespace,repository]))self.tag=tagorimage_tagorslugify(pendulum.now("utc").isoformat())self.dockerfile=dockerfileself.build_kwargs=build_kwargs@propertydefreference(self):returnf"{self.name}:{self.tag}"defbuild(self):full_image_name=self.referencebuild_kwargs=self.build_kwargs.copy()build_kwargs["context"]=Path.cwd()build_kwargs["tag"]=full_image_namebuild_kwargs["pull"]=build_kwargs.get("pull",True)ifself.dockerfile=="auto":withgenerate_default_dockerfile():build_image(**build_kwargs)else:build_kwargs["dockerfile"]=self.dockerfilebuild_image(**build_kwargs)defpush(self):withdocker_client()asclient:events=client.api.push(repository=self.name,tag=self.tag,stream=True,decode=True)foreventinevents:if"error"inevent:raisePushError(event["error"])
File path entrypoints are in the format: path/to/file.py:function_name.
Module path entrypoints are in the format: path.to.module.function_name.
Source code in src/prefect/deployments/runner.py
104105106107108109110111112113
classEntrypointType(enum.Enum):""" Enum representing a entrypoint type. File path entrypoints are in the format: `path/to/file.py:function_name`. Module path entrypoints are in the format: `path.to.module.function_name`. """FILE_PATH="file_path"MODULE_PATH="module_path"
A Prefect RunnerDeployment definition, used for specifying and building deployments.
Attributes:
Name
Type
Description
name
str
A name for the deployment (required).
version
Optional[str]
An optional version for the deployment; defaults to the flow's version
description
Optional[str]
An optional description of the deployment; defaults to the flow's
description
tags
List[str]
An optional list of tags to associate with this deployment; note that tags
are used only for organizational purposes. For delegating work to agents,
see work_queue_name.
schedule
Optional[SCHEDULE_TYPES]
A schedule to run this deployment on, once registered
is_schedule_active
Optional[bool]
Whether or not the schedule is active
parameters
Dict[str, Any]
A dictionary of parameter values to pass to runs created from this
deployment
path
Dict[str, Any]
The path to the working directory for the workflow, relative to remote
storage or, if stored on a local filesystem, an absolute path
entrypoint
Optional[str]
The path to the entrypoint for the workflow, always relative to the
path
parameter_openapi_schema
Optional[str]
The parameter schema of the flow, including defaults.
enforce_parameter_schema
bool
Whether or not the Prefect API should enforce the
parameter schema for this deployment.
work_pool_name
Optional[str]
The name of the work pool to use for this deployment.
work_queue_name
Optional[str]
The name of the work queue to use for this deployment's scheduled runs.
If not provided the default work queue for the work pool will be used.
job_variables
Dict[str, Any]
Settings used to override the values specified default base job template
of the chosen work pool. Refer to the base job template of the chosen work pool for
available settings.
classRunnerDeployment(BaseModel):""" A Prefect RunnerDeployment definition, used for specifying and building deployments. Attributes: name: A name for the deployment (required). version: An optional version for the deployment; defaults to the flow's version description: An optional description of the deployment; defaults to the flow's description tags: An optional list of tags to associate with this deployment; note that tags are used only for organizational purposes. For delegating work to agents, see `work_queue_name`. schedule: A schedule to run this deployment on, once registered is_schedule_active: Whether or not the schedule is active parameters: A dictionary of parameter values to pass to runs created from this deployment path: The path to the working directory for the workflow, relative to remote storage or, if stored on a local filesystem, an absolute path entrypoint: The path to the entrypoint for the workflow, always relative to the `path` parameter_openapi_schema: The parameter schema of the flow, including defaults. enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment. work_pool_name: The name of the work pool to use for this deployment. work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used. job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. """classConfig:arbitrary_types_allowed=Truename:str=Field(...,description="The name of the deployment.")flow_name:Optional[str]=Field(None,description="The name of the underlying flow; typically inferred.")description:Optional[str]=Field(default=None,description="An optional description of the deployment.")version:Optional[str]=Field(default=None,description="An optional version for the deployment.")tags:List[str]=Field(default_factory=list,description="One of more tags to apply to this deployment.",)schedules:Optional[List[MinimalDeploymentSchedule]]=Field(default=None,description="The schedules that should cause this deployment to run.",)schedule:Optional[SCHEDULE_TYPES]=Nonepaused:Optional[bool]=Field(default=None,description="Whether or not the deployment is paused.")is_schedule_active:Optional[bool]=Field(default=None,description="DEPRECATED: Whether or not the schedule is active.")parameters:Dict[str,Any]=Field(default_factory=dict)entrypoint:Optional[str]=Field(default=None,description=("The path to the entrypoint for the workflow, relative to the `path`."),)triggers:List[Union[DeploymentTriggerTypes,TriggerTypes]]=Field(default_factory=list,description="The triggers that should cause this deployment to run.",)enforce_parameter_schema:bool=Field(default=False,description=("Whether or not the Prefect API should enforce the parameter schema for"" this deployment."),)storage:Optional[RunnerStorage]=Field(default=None,description=("The storage object used to retrieve flow code for this deployment."),)work_pool_name:Optional[str]=Field(default=None,description=("The name of the work pool to use for this deployment. Only used when"" the deployment is registered with a built runner."),)work_queue_name:Optional[str]=Field(default=None,description=("The name of the work queue to use for this deployment. Only used when"" the deployment is registered with a built runner."),)job_variables:Dict[str,Any]=Field(default_factory=dict,description=("Job variables used to override the default values of a work pool"" base job template. Only used when the deployment is registered with"" a built runner."),)_entrypoint_type:EntrypointType=PrivateAttr(default=EntrypointType.FILE_PATH,)_path:Optional[str]=PrivateAttr(default=None,)_parameter_openapi_schema:ParameterSchema=PrivateAttr(default_factory=ParameterSchema,)@propertydefentrypoint_type(self)->EntrypointType:returnself._entrypoint_type@validator("triggers",allow_reuse=True)defvalidate_automation_names(cls,field_value,values):"""Ensure that each trigger has a name for its automation if none is provided."""returnvalidate_automation_names(field_value,values)@root_validator(pre=True)defreconcile_paused(cls,values):returnreconcile_paused_deployment(values)@root_validator(pre=True)defreconcile_schedules(cls,values):returnreconcile_schedules_runner(values)@sync_compatibleasyncdefapply(self,work_pool_name:Optional[str]=None,image:Optional[str]=None)->UUID:""" Registers this deployment with the API and returns the deployment's ID. Args: work_pool_name: The name of the work pool to use for this deployment. image: The registry, name, and tag of the Docker image to use for this deployment. Only used when the deployment is deployed to a work pool. Returns: The ID of the created deployment. """work_pool_name=work_pool_nameorself.work_pool_nameifimageandnotwork_pool_name:raiseValueError("An image can only be provided when registering a deployment with a"" work pool.")ifself.work_queue_nameandnotwork_pool_name:raiseValueError("A work queue can only be provided when registering a deployment with"" a work pool.")ifself.job_variablesandnotwork_pool_name:raiseValueError("Job variables can only be provided when registering a deployment"" with a work pool.")asyncwithget_client()asclient:flow_id=awaitclient.create_flow_from_name(self.flow_name)create_payload=dict(flow_id=flow_id,name=self.name,work_queue_name=self.work_queue_name,work_pool_name=work_pool_name,version=self.version,paused=self.paused,schedules=self.schedules,parameters=self.parameters,description=self.description,tags=self.tags,path=self._path,entrypoint=self.entrypoint,storage_document_id=None,infrastructure_document_id=None,parameter_openapi_schema=self._parameter_openapi_schema.dict(),enforce_parameter_schema=self.enforce_parameter_schema,)ifwork_pool_name:create_payload["job_variables"]=self.job_variablesifimage:create_payload["job_variables"]["image"]=imagecreate_payload["path"]=Noneifself.storageelseself._pathcreate_payload["pull_steps"]=([self.storage.to_pull_step()]ifself.storageelse[])try:deployment_id=awaitclient.create_deployment(**create_payload)exceptExceptionasexc:ifisinstance(exc,PrefectHTTPStatusError):detail=exc.response.json().get("detail")ifdetail:raiseDeploymentApplyError(detail)fromexcraiseDeploymentApplyError(f"Error while applying deployment: {str(exc)}")fromexcifclient.server_type.supports_automations():try:# The triggers defined in the deployment spec are, essentially,# anonymous and attempting truly sync them with cloud is not# feasible. Instead, we remove all automations that are owned# by the deployment, meaning that they were created via this# mechanism below, and then recreate them.awaitclient.delete_resource_owned_automations(f"prefect.deployment.{deployment_id}")exceptPrefectHTTPStatusErrorase:ife.response.status_code==404:# This Prefect server does not support automations, so we can safely# ignore this 404 and move on.returndeployment_idraiseefortriggerinself.triggers:trigger.set_deployment_id(deployment_id)awaitclient.create_automation(trigger.as_automation())returndeployment_id@staticmethoddef_construct_deployment_schedules(interval:Optional[Union[Iterable[Union[int,float,timedelta]],int,float,timedelta]]=None,anchor_date:Optional[Union[datetime,str]]=None,cron:Optional[Union[Iterable[str],str]]=None,rrule:Optional[Union[Iterable[str],str]]=None,timezone:Optional[str]=None,schedule:Optional[SCHEDULE_TYPES]=None,schedules:Optional[FlexibleScheduleList]=None,)->Union[List[MinimalDeploymentSchedule],FlexibleScheduleList]:""" Construct a schedule or schedules from the provided arguments. This method serves as a unified interface for creating deployment schedules. If `schedules` is provided, it is directly returned. If `schedule` is provided, it is encapsulated in a list and returned. If `interval`, `cron`, or `rrule` are provided, they are used to construct schedule objects. Args: interval: An interval on which to schedule runs, either as a single value or as a list of values. Accepts numbers (interpreted as seconds) or `timedelta` objects. Each value defines a separate scheduling interval. anchor_date: The anchor date from which interval schedules should start. This applies to all intervals if a list is provided. cron: A cron expression or a list of cron expressions defining cron schedules. Each expression defines a separate cron schedule. rrule: An rrule string or a list of rrule strings for scheduling. Each string defines a separate recurrence rule. timezone: The timezone to apply to the cron or rrule schedules. This is a single value applied uniformly to all schedules. schedule: A singular schedule object, used for advanced scheduling options like specifying a timezone. This is returned as a list containing this single schedule. schedules: A pre-defined list of schedule objects. If provided, this list is returned as-is, bypassing other schedule construction logic. """num_schedules=sum(1forentryin(interval,cron,rrule,schedule,schedules)ifentryisnotNone)ifnum_schedules>1:raiseValueError("Only one of interval, cron, rrule, schedule, or schedules can be provided.")elifnum_schedules==0:return[]ifschedulesisnotNone:returnscheduleselifintervalorcronorrrule:# `interval`, `cron`, and `rrule` can be lists of values. This# block figures out which one is not None and uses that to# construct the list of schedules via `construct_schedule`.parameters=[("interval",interval),("cron",cron),("rrule",rrule)]schedule_type,value=[paramforparaminparametersifparam[1]isnotNone][0]ifnotisiterable(value):value=[value]return[create_minimal_deployment_schedule(construct_schedule(**{schedule_type:v,"timezone":timezone,"anchor_date":anchor_date,}))forvinvalue]else:return[create_minimal_deployment_schedule(schedule)]def_set_defaults_from_flow(self,flow:"Flow"):self._parameter_openapi_schema=parameter_schema(flow)ifnotself.version:self.version=flow.versionifnotself.description:self.description=flow.description@classmethoddeffrom_flow(cls,flow:"Flow",name:str,interval:Optional[Union[Iterable[Union[int,float,timedelta]],int,float,timedelta]]=None,cron:Optional[Union[Iterable[str],str]]=None,rrule:Optional[Union[Iterable[str],str]]=None,paused:Optional[bool]=None,schedules:Optional[FlexibleScheduleList]=None,schedule:Optional[SCHEDULE_TYPES]=None,is_schedule_active:Optional[bool]=None,parameters:Optional[dict]=None,triggers:Optional[List[Union[DeploymentTriggerTypes,TriggerTypes]]]=None,description:Optional[str]=None,tags:Optional[List[str]]=None,version:Optional[str]=None,enforce_parameter_schema:bool=False,work_pool_name:Optional[str]=None,work_queue_name:Optional[str]=None,job_variables:Optional[Dict[str,Any]]=None,entrypoint_type:EntrypointType=EntrypointType.FILE_PATH,)->"RunnerDeployment":""" Configure a deployment for a given flow. Args: flow: A flow function to deploy name: A name for the deployment interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds. cron: A cron schedule of when to execute runs of this flow. rrule: An rrule schedule of when to execute runs of this flow. paused: Whether or not to set this deployment as paused. schedules: A list of schedule objects defining when to execute runs of this deployment. Used to define multiple schedules or additional scheduling options like `timezone`. schedule: A schedule object of when to execute runs of this flow. Used for advanced scheduling options like timezone. is_schedule_active: Whether or not to set the schedule for this deployment as active. If not provided when creating a deployment, the schedule will be set as active. If not provided when updating a deployment, the schedule's activation will not be changed. triggers: A list of triggers that should kick of a run of this flow. parameters: A dictionary of default parameter values to pass to runs of this flow. description: A description for the created deployment. Defaults to the flow's description if not provided. tags: A list of tags to associate with the created deployment for organizational purposes. version: A version for the created deployment. Defaults to the flow's version. enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment. work_pool_name: The name of the work pool to use for this deployment. work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used. job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. """constructed_schedules=cls._construct_deployment_schedules(interval=interval,cron=cron,rrule=rrule,schedule=schedule,schedules=schedules,)job_variables=job_variablesor{}deployment=cls(name=Path(name).stem,flow_name=flow.name,schedule=schedule,schedules=constructed_schedules,is_schedule_active=is_schedule_active,paused=paused,tags=tagsor[],triggers=triggersor[],parameters=parametersor{},description=description,version=version,enforce_parameter_schema=enforce_parameter_schema,work_pool_name=work_pool_name,work_queue_name=work_queue_name,job_variables=job_variables,)ifnotdeployment.entrypoint:no_file_location_error=("Flows defined interactively cannot be deployed. Check out the"" quickstart guide for help getting started:"" https://docs.prefect.io/latest/getting-started/quickstart")## first see if an entrypoint can be determinedflow_file=getattr(flow,"__globals__",{}).get("__file__")mod_name=getattr(flow,"__module__",None)ifentrypoint_type==EntrypointType.MODULE_PATH:ifmod_name:deployment.entrypoint=f"{mod_name}.{flow.__name__}"else:raiseValueError("Unable to determine module path for provided flow.")else:ifnotflow_file:ifnotmod_name:raiseValueError(no_file_location_error)try:module=importlib.import_module(mod_name)flow_file=getattr(module,"__file__",None)exceptModuleNotFoundErrorasexc:if"__prefect_loader__"instr(exc):raiseValueError("Cannot create a RunnerDeployment from a flow that has been"" loaded from an entrypoint. To deploy a flow via"" entrypoint, use RunnerDeployment.from_entrypoint instead.")raiseValueError(no_file_location_error)ifnotflow_file:raiseValueError(no_file_location_error)# set entrypointentry_path=(Path(flow_file).absolute().relative_to(Path.cwd().absolute()))deployment.entrypoint=f"{entry_path}:{flow.fn.__name__}"ifentrypoint_type==EntrypointType.FILE_PATHandnotdeployment._path:deployment._path="."deployment._entrypoint_type=entrypoint_typecls._set_defaults_from_flow(deployment,flow)returndeployment@classmethoddeffrom_entrypoint(cls,entrypoint:str,name:str,interval:Optional[Union[Iterable[Union[int,float,timedelta]],int,float,timedelta]]=None,cron:Optional[Union[Iterable[str],str]]=None,rrule:Optional[Union[Iterable[str],str]]=None,paused:Optional[bool]=None,schedules:Optional[FlexibleScheduleList]=None,schedule:Optional[SCHEDULE_TYPES]=None,is_schedule_active:Optional[bool]=None,parameters:Optional[dict]=None,triggers:Optional[List[Union[DeploymentTriggerTypes,TriggerTypes]]]=None,description:Optional[str]=None,tags:Optional[List[str]]=None,version:Optional[str]=None,enforce_parameter_schema:bool=False,work_pool_name:Optional[str]=None,work_queue_name:Optional[str]=None,job_variables:Optional[Dict[str,Any]]=None,)->"RunnerDeployment":""" Configure a deployment for a given flow located at a given entrypoint. Args: entrypoint: The path to a file containing a flow and the name of the flow function in the format `./path/to/file.py:flow_func_name`. name: A name for the deployment interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds. cron: A cron schedule of when to execute runs of this flow. rrule: An rrule schedule of when to execute runs of this flow. paused: Whether or not to set this deployment as paused. schedules: A list of schedule objects defining when to execute runs of this deployment. Used to define multiple schedules or additional scheduling options like `timezone`. schedule: A schedule object of when to execute runs of this flow. Used for advanced scheduling options like timezone. is_schedule_active: Whether or not to set the schedule for this deployment as active. If not provided when creating a deployment, the schedule will be set as active. If not provided when updating a deployment, the schedule's activation will not be changed. triggers: A list of triggers that should kick of a run of this flow. parameters: A dictionary of default parameter values to pass to runs of this flow. description: A description for the created deployment. Defaults to the flow's description if not provided. tags: A list of tags to associate with the created deployment for organizational purposes. version: A version for the created deployment. Defaults to the flow's version. enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment. work_pool_name: The name of the work pool to use for this deployment. work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used. job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. """fromprefect.flowsimportload_flow_from_entrypointjob_variables=job_variablesor{}flow=load_flow_from_entrypoint(entrypoint)constructed_schedules=cls._construct_deployment_schedules(interval=interval,cron=cron,rrule=rrule,schedule=schedule,schedules=schedules,)deployment=cls(name=Path(name).stem,flow_name=flow.name,schedule=schedule,schedules=constructed_schedules,paused=paused,is_schedule_active=is_schedule_active,tags=tagsor[],triggers=triggersor[],parameters=parametersor{},description=description,version=version,entrypoint=entrypoint,enforce_parameter_schema=enforce_parameter_schema,work_pool_name=work_pool_name,work_queue_name=work_queue_name,job_variables=job_variables,)deployment._path=str(Path.cwd())cls._set_defaults_from_flow(deployment,flow)returndeployment@classmethod@sync_compatibleasyncdeffrom_storage(cls,storage:RunnerStorage,entrypoint:str,name:str,interval:Optional[Union[Iterable[Union[int,float,timedelta]],int,float,timedelta]]=None,cron:Optional[Union[Iterable[str],str]]=None,rrule:Optional[Union[Iterable[str],str]]=None,paused:Optional[bool]=None,schedules:Optional[FlexibleScheduleList]=None,schedule:Optional[SCHEDULE_TYPES]=None,is_schedule_active:Optional[bool]=None,parameters:Optional[dict]=None,triggers:Optional[List[Union[DeploymentTriggerTypes,TriggerTypes]]]=None,description:Optional[str]=None,tags:Optional[List[str]]=None,version:Optional[str]=None,enforce_parameter_schema:bool=False,work_pool_name:Optional[str]=None,work_queue_name:Optional[str]=None,job_variables:Optional[Dict[str,Any]]=None,):""" Create a RunnerDeployment from a flow located at a given entrypoint and stored in a local storage location. Args: entrypoint: The path to a file containing a flow and the name of the flow function in the format `./path/to/file.py:flow_func_name`. name: A name for the deployment storage: A storage object to use for retrieving flow code. If not provided, a URL must be provided. interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds. cron: A cron schedule of when to execute runs of this flow. rrule: An rrule schedule of when to execute runs of this flow. schedule: A schedule object of when to execute runs of this flow. Used for advanced scheduling options like timezone. is_schedule_active: Whether or not to set the schedule for this deployment as active. If not provided when creating a deployment, the schedule will be set as active. If not provided when updating a deployment, the schedule's activation will not be changed. triggers: A list of triggers that should kick of a run of this flow. parameters: A dictionary of default parameter values to pass to runs of this flow. description: A description for the created deployment. Defaults to the flow's description if not provided. tags: A list of tags to associate with the created deployment for organizational purposes. version: A version for the created deployment. Defaults to the flow's version. enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment. work_pool_name: The name of the work pool to use for this deployment. work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used. job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. """fromprefect.flowsimportload_flow_from_entrypointconstructed_schedules=cls._construct_deployment_schedules(interval=interval,cron=cron,rrule=rrule,schedule=schedule,schedules=schedules,)job_variables=job_variablesor{}withtempfile.TemporaryDirectory()astmpdir:storage.set_base_path(Path(tmpdir))awaitstorage.pull_code()full_entrypoint=str(storage.destination/entrypoint)flow=awaitfrom_async.wait_for_call_in_new_thread(create_call(load_flow_from_entrypoint,full_entrypoint))deployment=cls(name=Path(name).stem,flow_name=flow.name,schedule=schedule,schedules=constructed_schedules,paused=paused,is_schedule_active=is_schedule_active,tags=tagsor[],triggers=triggersor[],parameters=parametersor{},description=description,version=version,entrypoint=entrypoint,enforce_parameter_schema=enforce_parameter_schema,storage=storage,work_pool_name=work_pool_name,work_queue_name=work_queue_name,job_variables=job_variables,)deployment._path=str(storage.destination).replace(tmpdir,"$STORAGE_BASE_PATH")cls._set_defaults_from_flow(deployment,flow)returndeployment
@sync_compatibleasyncdefapply(self,work_pool_name:Optional[str]=None,image:Optional[str]=None)->UUID:""" Registers this deployment with the API and returns the deployment's ID. Args: work_pool_name: The name of the work pool to use for this deployment. image: The registry, name, and tag of the Docker image to use for this deployment. Only used when the deployment is deployed to a work pool. Returns: The ID of the created deployment. """work_pool_name=work_pool_nameorself.work_pool_nameifimageandnotwork_pool_name:raiseValueError("An image can only be provided when registering a deployment with a"" work pool.")ifself.work_queue_nameandnotwork_pool_name:raiseValueError("A work queue can only be provided when registering a deployment with"" a work pool.")ifself.job_variablesandnotwork_pool_name:raiseValueError("Job variables can only be provided when registering a deployment"" with a work pool.")asyncwithget_client()asclient:flow_id=awaitclient.create_flow_from_name(self.flow_name)create_payload=dict(flow_id=flow_id,name=self.name,work_queue_name=self.work_queue_name,work_pool_name=work_pool_name,version=self.version,paused=self.paused,schedules=self.schedules,parameters=self.parameters,description=self.description,tags=self.tags,path=self._path,entrypoint=self.entrypoint,storage_document_id=None,infrastructure_document_id=None,parameter_openapi_schema=self._parameter_openapi_schema.dict(),enforce_parameter_schema=self.enforce_parameter_schema,)ifwork_pool_name:create_payload["job_variables"]=self.job_variablesifimage:create_payload["job_variables"]["image"]=imagecreate_payload["path"]=Noneifself.storageelseself._pathcreate_payload["pull_steps"]=([self.storage.to_pull_step()]ifself.storageelse[])try:deployment_id=awaitclient.create_deployment(**create_payload)exceptExceptionasexc:ifisinstance(exc,PrefectHTTPStatusError):detail=exc.response.json().get("detail")ifdetail:raiseDeploymentApplyError(detail)fromexcraiseDeploymentApplyError(f"Error while applying deployment: {str(exc)}")fromexcifclient.server_type.supports_automations():try:# The triggers defined in the deployment spec are, essentially,# anonymous and attempting truly sync them with cloud is not# feasible. Instead, we remove all automations that are owned# by the deployment, meaning that they were created via this# mechanism below, and then recreate them.awaitclient.delete_resource_owned_automations(f"prefect.deployment.{deployment_id}")exceptPrefectHTTPStatusErrorase:ife.response.status_code==404:# This Prefect server does not support automations, so we can safely# ignore this 404 and move on.returndeployment_idraiseefortriggerinself.triggers:trigger.set_deployment_id(deployment_id)awaitclient.create_automation(trigger.as_automation())returndeployment_id
An interval on which to execute the current flow. Accepts either a number
or a timedelta object. If a number is given, it will be interpreted as seconds.
None
cron
Optional[Union[Iterable[str], str]]
A cron schedule of when to execute runs of this flow.
None
rrule
Optional[Union[Iterable[str], str]]
An rrule schedule of when to execute runs of this flow.
None
paused
Optional[bool]
Whether or not to set this deployment as paused.
None
schedules
Optional[FlexibleScheduleList]
A list of schedule objects defining when to execute runs of this deployment.
Used to define multiple schedules or additional scheduling options like timezone.
None
schedule
Optional[SCHEDULE_TYPES]
A schedule object of when to execute runs of this flow. Used for
advanced scheduling options like timezone.
None
is_schedule_active
Optional[bool]
Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
A list of triggers that should kick of a run of this flow.
None
parameters
Optional[dict]
A dictionary of default parameter values to pass to runs of this flow.
None
description
Optional[str]
A description for the created deployment. Defaults to the flow's
description if not provided.
None
tags
Optional[List[str]]
A list of tags to associate with the created deployment for organizational
purposes.
None
version
Optional[str]
A version for the created deployment. Defaults to the flow's version.
None
enforce_parameter_schema
bool
Whether or not the Prefect API should enforce the
parameter schema for this deployment.
False
work_pool_name
Optional[str]
The name of the work pool to use for this deployment.
None
work_queue_name
Optional[str]
The name of the work queue to use for this deployment's scheduled runs.
If not provided the default work queue for the work pool will be used.
None
job_variables
Optional[Dict[str, Any]]
Settings used to override the values specified default base job template
of the chosen work pool. Refer to the base job template of the chosen work pool for
available settings.
@classmethoddeffrom_entrypoint(cls,entrypoint:str,name:str,interval:Optional[Union[Iterable[Union[int,float,timedelta]],int,float,timedelta]]=None,cron:Optional[Union[Iterable[str],str]]=None,rrule:Optional[Union[Iterable[str],str]]=None,paused:Optional[bool]=None,schedules:Optional[FlexibleScheduleList]=None,schedule:Optional[SCHEDULE_TYPES]=None,is_schedule_active:Optional[bool]=None,parameters:Optional[dict]=None,triggers:Optional[List[Union[DeploymentTriggerTypes,TriggerTypes]]]=None,description:Optional[str]=None,tags:Optional[List[str]]=None,version:Optional[str]=None,enforce_parameter_schema:bool=False,work_pool_name:Optional[str]=None,work_queue_name:Optional[str]=None,job_variables:Optional[Dict[str,Any]]=None,)->"RunnerDeployment":""" Configure a deployment for a given flow located at a given entrypoint. Args: entrypoint: The path to a file containing a flow and the name of the flow function in the format `./path/to/file.py:flow_func_name`. name: A name for the deployment interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds. cron: A cron schedule of when to execute runs of this flow. rrule: An rrule schedule of when to execute runs of this flow. paused: Whether or not to set this deployment as paused. schedules: A list of schedule objects defining when to execute runs of this deployment. Used to define multiple schedules or additional scheduling options like `timezone`. schedule: A schedule object of when to execute runs of this flow. Used for advanced scheduling options like timezone. is_schedule_active: Whether or not to set the schedule for this deployment as active. If not provided when creating a deployment, the schedule will be set as active. If not provided when updating a deployment, the schedule's activation will not be changed. triggers: A list of triggers that should kick of a run of this flow. parameters: A dictionary of default parameter values to pass to runs of this flow. description: A description for the created deployment. Defaults to the flow's description if not provided. tags: A list of tags to associate with the created deployment for organizational purposes. version: A version for the created deployment. Defaults to the flow's version. enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment. work_pool_name: The name of the work pool to use for this deployment. work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used. job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. """fromprefect.flowsimportload_flow_from_entrypointjob_variables=job_variablesor{}flow=load_flow_from_entrypoint(entrypoint)constructed_schedules=cls._construct_deployment_schedules(interval=interval,cron=cron,rrule=rrule,schedule=schedule,schedules=schedules,)deployment=cls(name=Path(name).stem,flow_name=flow.name,schedule=schedule,schedules=constructed_schedules,paused=paused,is_schedule_active=is_schedule_active,tags=tagsor[],triggers=triggersor[],parameters=parametersor{},description=description,version=version,entrypoint=entrypoint,enforce_parameter_schema=enforce_parameter_schema,work_pool_name=work_pool_name,work_queue_name=work_queue_name,job_variables=job_variables,)deployment._path=str(Path.cwd())cls._set_defaults_from_flow(deployment,flow)returndeployment
An interval on which to execute the current flow. Accepts either a number
or a timedelta object. If a number is given, it will be interpreted as seconds.
None
cron
Optional[Union[Iterable[str], str]]
A cron schedule of when to execute runs of this flow.
None
rrule
Optional[Union[Iterable[str], str]]
An rrule schedule of when to execute runs of this flow.
None
paused
Optional[bool]
Whether or not to set this deployment as paused.
None
schedules
Optional[FlexibleScheduleList]
A list of schedule objects defining when to execute runs of this deployment.
Used to define multiple schedules or additional scheduling options like timezone.
None
schedule
Optional[SCHEDULE_TYPES]
A schedule object of when to execute runs of this flow. Used for
advanced scheduling options like timezone.
None
is_schedule_active
Optional[bool]
Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
A list of triggers that should kick of a run of this flow.
None
parameters
Optional[dict]
A dictionary of default parameter values to pass to runs of this flow.
None
description
Optional[str]
A description for the created deployment. Defaults to the flow's
description if not provided.
None
tags
Optional[List[str]]
A list of tags to associate with the created deployment for organizational
purposes.
None
version
Optional[str]
A version for the created deployment. Defaults to the flow's version.
None
enforce_parameter_schema
bool
Whether or not the Prefect API should enforce the
parameter schema for this deployment.
False
work_pool_name
Optional[str]
The name of the work pool to use for this deployment.
None
work_queue_name
Optional[str]
The name of the work queue to use for this deployment's scheduled runs.
If not provided the default work queue for the work pool will be used.
None
job_variables
Optional[Dict[str, Any]]
Settings used to override the values specified default base job template
of the chosen work pool. Refer to the base job template of the chosen work pool for
available settings.
@classmethoddeffrom_flow(cls,flow:"Flow",name:str,interval:Optional[Union[Iterable[Union[int,float,timedelta]],int,float,timedelta]]=None,cron:Optional[Union[Iterable[str],str]]=None,rrule:Optional[Union[Iterable[str],str]]=None,paused:Optional[bool]=None,schedules:Optional[FlexibleScheduleList]=None,schedule:Optional[SCHEDULE_TYPES]=None,is_schedule_active:Optional[bool]=None,parameters:Optional[dict]=None,triggers:Optional[List[Union[DeploymentTriggerTypes,TriggerTypes]]]=None,description:Optional[str]=None,tags:Optional[List[str]]=None,version:Optional[str]=None,enforce_parameter_schema:bool=False,work_pool_name:Optional[str]=None,work_queue_name:Optional[str]=None,job_variables:Optional[Dict[str,Any]]=None,entrypoint_type:EntrypointType=EntrypointType.FILE_PATH,)->"RunnerDeployment":""" Configure a deployment for a given flow. Args: flow: A flow function to deploy name: A name for the deployment interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds. cron: A cron schedule of when to execute runs of this flow. rrule: An rrule schedule of when to execute runs of this flow. paused: Whether or not to set this deployment as paused. schedules: A list of schedule objects defining when to execute runs of this deployment. Used to define multiple schedules or additional scheduling options like `timezone`. schedule: A schedule object of when to execute runs of this flow. Used for advanced scheduling options like timezone. is_schedule_active: Whether or not to set the schedule for this deployment as active. If not provided when creating a deployment, the schedule will be set as active. If not provided when updating a deployment, the schedule's activation will not be changed. triggers: A list of triggers that should kick of a run of this flow. parameters: A dictionary of default parameter values to pass to runs of this flow. description: A description for the created deployment. Defaults to the flow's description if not provided. tags: A list of tags to associate with the created deployment for organizational purposes. version: A version for the created deployment. Defaults to the flow's version. enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment. work_pool_name: The name of the work pool to use for this deployment. work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used. job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. """constructed_schedules=cls._construct_deployment_schedules(interval=interval,cron=cron,rrule=rrule,schedule=schedule,schedules=schedules,)job_variables=job_variablesor{}deployment=cls(name=Path(name).stem,flow_name=flow.name,schedule=schedule,schedules=constructed_schedules,is_schedule_active=is_schedule_active,paused=paused,tags=tagsor[],triggers=triggersor[],parameters=parametersor{},description=description,version=version,enforce_parameter_schema=enforce_parameter_schema,work_pool_name=work_pool_name,work_queue_name=work_queue_name,job_variables=job_variables,)ifnotdeployment.entrypoint:no_file_location_error=("Flows defined interactively cannot be deployed. Check out the"" quickstart guide for help getting started:"" https://docs.prefect.io/latest/getting-started/quickstart")## first see if an entrypoint can be determinedflow_file=getattr(flow,"__globals__",{}).get("__file__")mod_name=getattr(flow,"__module__",None)ifentrypoint_type==EntrypointType.MODULE_PATH:ifmod_name:deployment.entrypoint=f"{mod_name}.{flow.__name__}"else:raiseValueError("Unable to determine module path for provided flow.")else:ifnotflow_file:ifnotmod_name:raiseValueError(no_file_location_error)try:module=importlib.import_module(mod_name)flow_file=getattr(module,"__file__",None)exceptModuleNotFoundErrorasexc:if"__prefect_loader__"instr(exc):raiseValueError("Cannot create a RunnerDeployment from a flow that has been"" loaded from an entrypoint. To deploy a flow via"" entrypoint, use RunnerDeployment.from_entrypoint instead.")raiseValueError(no_file_location_error)ifnotflow_file:raiseValueError(no_file_location_error)# set entrypointentry_path=(Path(flow_file).absolute().relative_to(Path.cwd().absolute()))deployment.entrypoint=f"{entry_path}:{flow.fn.__name__}"ifentrypoint_type==EntrypointType.FILE_PATHandnotdeployment._path:deployment._path="."deployment._entrypoint_type=entrypoint_typecls._set_defaults_from_flow(deployment,flow)returndeployment
An interval on which to execute the current flow. Accepts either a number
or a timedelta object. If a number is given, it will be interpreted as seconds.
None
cron
Optional[Union[Iterable[str], str]]
A cron schedule of when to execute runs of this flow.
None
rrule
Optional[Union[Iterable[str], str]]
An rrule schedule of when to execute runs of this flow.
None
schedule
Optional[SCHEDULE_TYPES]
A schedule object of when to execute runs of this flow. Used for
advanced scheduling options like timezone.
None
is_schedule_active
Optional[bool]
Whether or not to set the schedule for this deployment as active. If
not provided when creating a deployment, the schedule will be set as active. If not
provided when updating a deployment, the schedule's activation will not be changed.
A list of triggers that should kick of a run of this flow.
None
parameters
Optional[dict]
A dictionary of default parameter values to pass to runs of this flow.
None
description
Optional[str]
A description for the created deployment. Defaults to the flow's
description if not provided.
None
tags
Optional[List[str]]
A list of tags to associate with the created deployment for organizational
purposes.
None
version
Optional[str]
A version for the created deployment. Defaults to the flow's version.
None
enforce_parameter_schema
bool
Whether or not the Prefect API should enforce the
parameter schema for this deployment.
False
work_pool_name
Optional[str]
The name of the work pool to use for this deployment.
None
work_queue_name
Optional[str]
The name of the work queue to use for this deployment's scheduled runs.
If not provided the default work queue for the work pool will be used.
None
job_variables
Optional[Dict[str, Any]]
Settings used to override the values specified default base job template
of the chosen work pool. Refer to the base job template of the chosen work pool for
available settings.
@classmethod@sync_compatibleasyncdeffrom_storage(cls,storage:RunnerStorage,entrypoint:str,name:str,interval:Optional[Union[Iterable[Union[int,float,timedelta]],int,float,timedelta]]=None,cron:Optional[Union[Iterable[str],str]]=None,rrule:Optional[Union[Iterable[str],str]]=None,paused:Optional[bool]=None,schedules:Optional[FlexibleScheduleList]=None,schedule:Optional[SCHEDULE_TYPES]=None,is_schedule_active:Optional[bool]=None,parameters:Optional[dict]=None,triggers:Optional[List[Union[DeploymentTriggerTypes,TriggerTypes]]]=None,description:Optional[str]=None,tags:Optional[List[str]]=None,version:Optional[str]=None,enforce_parameter_schema:bool=False,work_pool_name:Optional[str]=None,work_queue_name:Optional[str]=None,job_variables:Optional[Dict[str,Any]]=None,):""" Create a RunnerDeployment from a flow located at a given entrypoint and stored in a local storage location. Args: entrypoint: The path to a file containing a flow and the name of the flow function in the format `./path/to/file.py:flow_func_name`. name: A name for the deployment storage: A storage object to use for retrieving flow code. If not provided, a URL must be provided. interval: An interval on which to execute the current flow. Accepts either a number or a timedelta object. If a number is given, it will be interpreted as seconds. cron: A cron schedule of when to execute runs of this flow. rrule: An rrule schedule of when to execute runs of this flow. schedule: A schedule object of when to execute runs of this flow. Used for advanced scheduling options like timezone. is_schedule_active: Whether or not to set the schedule for this deployment as active. If not provided when creating a deployment, the schedule will be set as active. If not provided when updating a deployment, the schedule's activation will not be changed. triggers: A list of triggers that should kick of a run of this flow. parameters: A dictionary of default parameter values to pass to runs of this flow. description: A description for the created deployment. Defaults to the flow's description if not provided. tags: A list of tags to associate with the created deployment for organizational purposes. version: A version for the created deployment. Defaults to the flow's version. enforce_parameter_schema: Whether or not the Prefect API should enforce the parameter schema for this deployment. work_pool_name: The name of the work pool to use for this deployment. work_queue_name: The name of the work queue to use for this deployment's scheduled runs. If not provided the default work queue for the work pool will be used. job_variables: Settings used to override the values specified default base job template of the chosen work pool. Refer to the base job template of the chosen work pool for available settings. """fromprefect.flowsimportload_flow_from_entrypointconstructed_schedules=cls._construct_deployment_schedules(interval=interval,cron=cron,rrule=rrule,schedule=schedule,schedules=schedules,)job_variables=job_variablesor{}withtempfile.TemporaryDirectory()astmpdir:storage.set_base_path(Path(tmpdir))awaitstorage.pull_code()full_entrypoint=str(storage.destination/entrypoint)flow=awaitfrom_async.wait_for_call_in_new_thread(create_call(load_flow_from_entrypoint,full_entrypoint))deployment=cls(name=Path(name).stem,flow_name=flow.name,schedule=schedule,schedules=constructed_schedules,paused=paused,is_schedule_active=is_schedule_active,tags=tagsor[],triggers=triggersor[],parameters=parametersor{},description=description,version=version,entrypoint=entrypoint,enforce_parameter_schema=enforce_parameter_schema,storage=storage,work_pool_name=work_pool_name,work_queue_name=work_queue_name,job_variables=job_variables,)deployment._path=str(storage.destination).replace(tmpdir,"$STORAGE_BASE_PATH")cls._set_defaults_from_flow(deployment,flow)returndeployment
Ensure that each trigger has a name for its automation if none is provided.
Source code in src/prefect/deployments/runner.py
235236237238
@validator("triggers",allow_reuse=True)defvalidate_automation_names(cls,field_value,values):"""Ensure that each trigger has a name for its automation if none is provided."""returnvalidate_automation_names(field_value,values)
Deploy the provided list of deployments to dynamic infrastructure via a
work pool.
By default, calling this function will build a Docker image for the deployments, push it to a
registry, and create each deployment via the Prefect API that will run the corresponding
flow on the given schedule.
If you want to use an existing image, you can pass build=False to skip building and pushing
an image.
The name of the Docker image to build, including the registry and
repository. Pass a DeploymentImage instance to customize the Dockerfile used
and build arguments.
None
build
bool
Whether or not to build a new image for the flow. If False, the provided
image will be used as-is and pulled at runtime.
True
push
bool
Whether or not to skip pushing the built image to a registry.
True
print_next_steps_message
bool
Whether or not to print a message with next steps
after deploying the deployments.
True
Returns:
Type
Description
List[UUID]
A list of deployment IDs for the created/updated deployments.
Examples:
Deploy a group of flows to a work pool:
fromprefectimportdeploy,flow@flow(log_prints=True)deflocal_flow():print("I'm a locally defined flow!")if__name__=="__main__":deploy(local_flow.to_deployment(name="example-deploy-local-flow"),flow.from_source(source="https://github.com/org/repo.git",entrypoint="flows.py:my_flow",).to_deployment(name="example-deploy-remote-flow",),work_pool_name="my-work-pool",image="my-registry/my-image:dev",)
@sync_compatibleasyncdefdeploy(*deployments:RunnerDeployment,work_pool_name:Optional[str]=None,image:Optional[Union[str,DeploymentImage]]=None,build:bool=True,push:bool=True,print_next_steps_message:bool=True,ignore_warnings:bool=False,)->List[UUID]:""" Deploy the provided list of deployments to dynamic infrastructure via a work pool. By default, calling this function will build a Docker image for the deployments, push it to a registry, and create each deployment via the Prefect API that will run the corresponding flow on the given schedule. If you want to use an existing image, you can pass `build=False` to skip building and pushing an image. Args: *deployments: A list of deployments to deploy. work_pool_name: The name of the work pool to use for these deployments. Defaults to the value of `PREFECT_DEFAULT_WORK_POOL_NAME`. image: The name of the Docker image to build, including the registry and repository. Pass a DeploymentImage instance to customize the Dockerfile used and build arguments. build: Whether or not to build a new image for the flow. If False, the provided image will be used as-is and pulled at runtime. push: Whether or not to skip pushing the built image to a registry. print_next_steps_message: Whether or not to print a message with next steps after deploying the deployments. Returns: A list of deployment IDs for the created/updated deployments. Examples: Deploy a group of flows to a work pool: ```python from prefect import deploy, flow @flow(log_prints=True) def local_flow(): print("I'm a locally defined flow!") if __name__ == "__main__": deploy( local_flow.to_deployment(name="example-deploy-local-flow"), flow.from_source( source="https://github.com/org/repo.git", entrypoint="flows.py:my_flow", ).to_deployment( name="example-deploy-remote-flow", ), work_pool_name="my-work-pool", image="my-registry/my-image:dev", ) ``` """work_pool_name=work_pool_nameorPREFECT_DEFAULT_WORK_POOL_NAME.value()ifnotimageandnotall(d.storageord.entrypoint_type==EntrypointType.MODULE_PATHfordindeployments):raiseValueError("Either an image or remote storage location must be provided when deploying"" a deployment.")ifnotwork_pool_name:raiseValueError("A work pool name must be provided when deploying a deployment. Either"" provide a work pool name when calling `deploy` or set"" `PREFECT_DEFAULT_WORK_POOL_NAME` in your profile.")ifimageandisinstance(image,str):image_name,image_tag=parse_image_tag(image)image=DeploymentImage(name=image_name,tag=image_tag)try:asyncwithget_client()asclient:work_pool=awaitclient.read_work_pool(work_pool_name)exceptObjectNotFoundasexc:raiseValueError(f"Could not find work pool {work_pool_name!r}. Please create it before"" deploying this flow.")fromexcis_docker_based_work_pool=get_from_dict(work_pool.base_job_template,"variables.properties.image",False)is_block_based_work_pool=get_from_dict(work_pool.base_job_template,"variables.properties.block",False)# carve out an exception for block based work pools that only have a block in their base job templateconsole=Console()ifnotis_docker_based_work_poolandnotis_block_based_work_pool:ifimage:raiseValueError(f"Work pool {work_pool_name!r} does not support custom Docker images."" Please use a work pool with an `image` variable in its base job template"" or specify a remote storage location for the flow with `.from_source`."" If you are attempting to deploy a flow to a local process work pool,"" consider using `flow.serve` instead. See the documentation for more"" information: https://docs.prefect.io/latest/concepts/flows/#serving-a-flow")elifwork_pool.type=="process"andnotignore_warnings:console.print("Looks like you're deploying to a process work pool. If you're creating a"" deployment for local development, calling `.serve` on your flow is a great"" way to get started. See the documentation for more information:"" https://docs.prefect.io/latest/concepts/flows/#serving-a-flow. "" Set `ignore_warnings=True` to suppress this message.",style="yellow",)is_managed_pool=work_pool.is_managed_poolifis_managed_pool:build=Falsepush=Falseifimageandbuild:withProgress(SpinnerColumn(),TextColumn(f"Building image {image.reference}..."),transient=True,console=console,)asprogress:docker_build_task=progress.add_task("docker_build",total=1)image.build()progress.update(docker_build_task,completed=1)console.print(f"Successfully built image {image.reference!r}",style="green")ifimageandbuildandpush:withProgress(SpinnerColumn(),TextColumn("Pushing image..."),transient=True,console=console,)asprogress:docker_push_task=progress.add_task("docker_push",total=1)image.push()progress.update(docker_push_task,completed=1)console.print(f"Successfully pushed image {image.reference!r}",style="green")deployment_exceptions=[]deployment_ids=[]image_ref=image.referenceifimageelseNonefordeploymentintrack(deployments,description="Creating/updating deployments...",console=console,transient=True,):try:deployment_ids.append(awaitdeployment.apply(image=image_ref,work_pool_name=work_pool_name))exceptExceptionasexc:iflen(deployments)==1:raisedeployment_exceptions.append({"deployment":deployment,"exc":exc})ifdeployment_exceptions:console.print("Encountered errors while creating/updating deployments:\n",style="orange_red1",)else:console.print("Successfully created/updated all deployments!\n",style="green")complete_failure=len(deployment_exceptions)==len(deployments)table=Table(title="Deployments",show_lines=True,)table.add_column(header="Name",style="blue",no_wrap=True)table.add_column(header="Status",style="blue",no_wrap=True)table.add_column(header="Details",style="blue")fordeploymentindeployments:errored_deployment=next((dfordindeployment_exceptionsifd["deployment"]==deployment),None,)iferrored_deployment:table.add_row(f"{deployment.flow_name}/{deployment.name}","failed",str(errored_deployment["exc"]),style="red",)else:table.add_row(f"{deployment.flow_name}/{deployment.name}","applied")console.print(table)ifprint_next_steps_messageandnotcomplete_failure:ifnotwork_pool.is_push_poolandnotwork_pool.is_managed_pool:console.print("\nTo execute flow runs from these deployments, start a worker in a"" separate terminal that pulls work from the"f" {work_pool_name!r} work pool:")console.print(f"\n\t$ prefect worker start --pool {work_pool_name!r}",style="blue",)console.print("\nTo trigger any of these deployments, use the"" following command:\n[blue]\n\t$ prefect deployment run"" [DEPLOYMENT_NAME]\n[/]")ifPREFECT_UI_URL:console.print("\nYou can also trigger your deployments via the Prefect UI:"f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n")returndeployment_ids