A Prefect Deployment definition, used for specifying and building deployments.
Parameters:
Name
Type
Description
Default
name
A name for the deployment (required).
required
version
An optional version for the deployment; defaults to the flow's version
required
description
An optional description of the deployment; defaults to the flow's description
required
tags
An optional list of tags to associate with this deployment; note that tags are
used only for organizational purposes. For delegating work to agents, see work_queue_name.
required
schedule
A schedule to run this deployment on, once registered
required
is_schedule_active
Whether or not the schedule is active
required
work_queue_name
The work queue that will handle this deployment's runs
required
flow_name
The name of the flow this deployment encapsulates
required
parameters
A dictionary of parameter values to pass to runs created from this deployment
required
infrastructure
An optional infrastructure block used to configure infrastructure for runs;
if not provided, will default to running this deployment in Agent subprocesses
required
infra_overrides
A dictionary of dot delimited infrastructure overrides that will be applied at
runtime; for example env.CONFIG_KEY=config_value or namespace='prefect'
required
storage
An optional remote storage block used to store and retrieve this workflow;
if not provided, will default to referencing this flow by its local path
required
path
The path to the working directory for the workflow, relative to remote storage or,
if stored on a local filesystem, an absolute path
required
entrypoint
The path to the entrypoint for the workflow, always relative to the path
required
parameter_openapi_schema
The parameter schema of the flow, including defaults.
required
Examples:
Create a new deployment using configuration defaults for an imported flow:
@experimental_field("work_pool_name",group="work_pools",when=lambdax:xisnotNoneandx!=DEFAULT_AGENT_WORK_POOL_NAME,)classDeployment(BaseModel):""" A Prefect Deployment definition, used for specifying and building deployments. Args: name: A name for the deployment (required). version: An optional version for the deployment; defaults to the flow's version description: An optional description of the deployment; defaults to the flow's description tags: An optional list of tags to associate with this deployment; note that tags are used only for organizational purposes. For delegating work to agents, see `work_queue_name`. schedule: A schedule to run this deployment on, once registered is_schedule_active: Whether or not the schedule is active work_queue_name: The work queue that will handle this deployment's runs flow_name: The name of the flow this deployment encapsulates parameters: A dictionary of parameter values to pass to runs created from this deployment infrastructure: An optional infrastructure block used to configure infrastructure for runs; if not provided, will default to running this deployment in Agent subprocesses infra_overrides: A dictionary of dot delimited infrastructure overrides that will be applied at runtime; for example `env.CONFIG_KEY=config_value` or `namespace='prefect'` storage: An optional remote storage block used to store and retrieve this workflow; if not provided, will default to referencing this flow by its local path path: The path to the working directory for the workflow, relative to remote storage or, if stored on a local filesystem, an absolute path entrypoint: The path to the entrypoint for the workflow, always relative to the `path` parameter_openapi_schema: The parameter schema of the flow, including defaults. Examples: Create a new deployment using configuration defaults for an imported flow: >>> from my_project.flows import my_flow >>> from prefect.deployments import Deployment >>> >>> deployment = Deployment.build_from_flow( ... flow=my_flow, ... name="example", ... version="1", ... tags=["demo"], >>> ) >>> deployment.apply() Create a new deployment with custom storage and an infrastructure override: >>> from my_project.flows import my_flow >>> from prefect.deployments import Deployment >>> from prefect.filesystems import S3 >>> storage = S3.load("dev-bucket") # load a pre-defined block >>> deployment = Deployment.build_from_flow( ... flow=my_flow, ... name="s3-example", ... version="2", ... tags=["aws"], ... storage=storage, ... infra_overrides=dict("env.PREFECT_LOGGING_LEVEL"="DEBUG"), >>> ) >>> deployment.apply() """classConfig:json_encoders={SecretDict:lambdav:v.dict()}validate_assignment=Trueextra="forbid"@propertydef_editable_fields(self)->List[str]:editable_fields=["name","description","version","work_queue_name","work_pool_name","tags","parameters","schedule","is_schedule_active","infra_overrides",]# if infrastructure is baked as a pre-saved block, then# editing its fields will not update anythingifself.infrastructure._block_document_id:returneditable_fieldselse:returneditable_fields+["infrastructure"]@propertydeflocation(self)->str:""" The 'location' that this deployment points to is given by `path` alone in the case of no remote storage, and otherwise by `storage.basepath / path`. The underlying flow entrypoint is interpreted relative to this location. """location=""ifself.storage:location=(self.storage.basepath+"/"ifnotself.storage.basepath.endswith("/")else"")ifself.path:location+=self.pathreturnlocation@sync_compatibleasyncdefto_yaml(self,path:Path)->None:yaml_dict=self._yaml_dict()schema=self.schema()withopen(path,"w")asf:# write headerf.write("###\n### A complete description of a Prefect Deployment for flow"f" {self.flow_name!r}\n###\n")# write editable fieldsforfieldinself._editable_fields:# write any commentsifschema["properties"][field].get("yaml_comment"):f.write(f"# {schema['properties'][field]['yaml_comment']}\n")# write the fieldyaml.dump({field:yaml_dict[field]},f,sort_keys=False)# write non-editable fieldsf.write("\n###\n### DO NOT EDIT BELOW THIS LINE\n###\n")yaml.dump({k:vfork,vinyaml_dict.items()ifknotinself._editable_fields},f,sort_keys=False,)def_yaml_dict(self)->dict:""" Returns a YAML-compatible representation of this deployment as a dictionary. """# avoids issues with UUIDs showing up in YAMLall_fields=json.loads(self.json(exclude={"storage":{"_filesystem","filesystem","_remote_file_system"}}))ifall_fields["storage"]:all_fields["storage"]["_block_type_slug"]=self.storage.get_block_type_slug()ifall_fields["infrastructure"]:all_fields["infrastructure"]["_block_type_slug"]=self.infrastructure.get_block_type_slug()returnall_fields# top level metadataname:str=Field(...,description="The name of the deployment.")description:Optional[str]=Field(default=None,description="An optional description of the deployment.")version:Optional[str]=Field(default=None,description="An optional version for the deployment.")tags:List[str]=Field(default_factory=list,description="One of more tags to apply to this deployment.",)schedule:schemas.schedules.SCHEDULE_TYPES=Noneis_schedule_active:Optional[bool]=Field(default=None,description="Whether or not the schedule is active.")flow_name:Optional[str]=Field(default=None,description="The name of the flow.")work_queue_name:Optional[str]=Field("default",description="The work queue for the deployment.",yaml_comment="The work queue that will handle this deployment's runs",)work_pool_name:Optional[str]=Field(default=None,description="The work pool for the deployment")# flow dataparameters:Dict[str,Any]=Field(default_factory=dict)manifest_path:Optional[str]=Field(default=None,description=("The path to the flow's manifest file, relative to the chosen storage."),)infrastructure:Infrastructure=Field(default_factory=Process)infra_overrides:Dict[str,Any]=Field(default_factory=dict,description="Overrides to apply to the base infrastructure block at runtime.",)storage:Optional[Block]=Field(None,help="The remote storage to use for this workflow.",)path:Optional[str]=Field(default=None,description=("The path to the working directory for the workflow, relative to remote"" storage or an absolute path."),)entrypoint:Optional[str]=Field(default=None,description=("The path to the entrypoint for the workflow, relative to the `path`."),)parameter_openapi_schema:ParameterSchema=Field(default_factory=ParameterSchema,description="The parameter schema of the flow, including defaults.",)timestamp:datetime=Field(default_factory=partial(pendulum.now,"UTC"))@validator("infrastructure",pre=True)definfrastructure_must_have_capabilities(cls,value):ifisinstance(value,dict):if"_block_type_slug"invalue:# Replace private attribute with public for dispatchvalue["block_type_slug"]=value.pop("_block_type_slug")block=Block(**value)elifvalueisNone:returnvalueelse:block=valueif"run-infrastructure"notinblock.get_block_capabilities():raiseValueError("Infrastructure block must have 'run-infrastructure' capabilities.")returnblock@validator("storage",pre=True)defstorage_must_have_capabilities(cls,value):ifisinstance(value,dict):block_type=lookup_type(Block,value.pop("_block_type_slug"))block=block_type(**value)elifvalueisNone:returnvalueelse:block=valuecapabilities=block.get_block_capabilities()if"get-directory"notincapabilities:raiseValueError("Remote Storage block must have 'get-directory' capabilities.")returnblock@validator("parameter_openapi_schema",pre=True)defhandle_openapi_schema(cls,value):""" This method ensures setting a value of `None` is handled gracefully. """ifvalueisNone:returnParameterSchema()returnvalue@classmethod@sync_compatibleasyncdefload_from_yaml(cls,path:str):data=yaml.safe_load(awaitanyio.Path(path).read_bytes())# load blocks from server to ensure secret values are properly hydratedifdata["storage"]:block_doc_name=data["storage"].get("_block_document_name")# if no doc name, this block is not stored on the serverifblock_doc_name:block_slug=data["storage"]["_block_type_slug"]block=awaitBlock.load(f"{block_slug}/{block_doc_name}")data["storage"]=blockifdata["infrastructure"]:block_doc_name=data["infrastructure"].get("_block_document_name")# if no doc name, this block is not stored on the serverifblock_doc_name:block_slug=data["infrastructure"]["_block_type_slug"]block=awaitBlock.load(f"{block_slug}/{block_doc_name}")data["infrastructure"]=blockreturncls(**data)@sync_compatibleasyncdefload(self)->bool:""" Queries the API for a deployment with this name for this flow, and if found, prepopulates any settings that were not set at initialization. Returns a boolean specifying whether a load was successful or not. Raises: - ValueError: if both name and flow name are not set """ifnotself.nameornotself.flow_name:raiseValueError("Both a deployment name and flow name must be provided.")asyncwithget_client()asclient:try:deployment=awaitclient.read_deployment_by_name(f"{self.flow_name}/{self.name}")ifdeployment.storage_document_id:storage=Block._from_block_document(awaitclient.read_block_document(deployment.storage_document_id))excluded_fields=self.__fields_set__.union({"infrastructure","storage","timestamp"})forfieldinset(self.__fields__.keys())-excluded_fields:new_value=getattr(deployment,field)setattr(self,field,new_value)if"infrastructure"notinself.__fields_set__:ifdeployment.infrastructure_document_id:self.infrastructure=Block._from_block_document(awaitclient.read_block_document(deployment.infrastructure_document_id))if"storage"notinself.__fields_set__:ifdeployment.storage_document_id:self.storage=Block._from_block_document(awaitclient.read_block_document(deployment.storage_document_id))exceptObjectNotFound:returnFalsereturnTrue@sync_compatibleasyncdefupdate(self,ignore_none:bool=False,**kwargs):""" Performs an in-place update with the provided settings. Args: ignore_none: if True, all `None` values are ignored when performing the update """unknown_keys=set(kwargs.keys())-set(self.dict().keys())ifunknown_keys:raiseValueError(f"Received unexpected attributes: {', '.join(unknown_keys)}")forkey,valueinkwargs.items():ifignore_noneandvalueisNone:continuesetattr(self,key,value)@sync_compatibleasyncdefupload_to_storage(self,storage_block:str=None,ignore_file:str=".prefectignore")->Optional[int]:""" Uploads the workflow this deployment represents using a provided storage block; if no block is provided, defaults to configuring self for local storage. Args: storage_block: a string reference a remote storage block slug `$type/$name`; if provided, used to upload the workflow's project ignore_file: an optional path to a `.prefectignore` file that specifies filename patterns to ignore when uploading to remote storage; if not provided, looks for `.prefectignore` in the current working directory """deployment_path=Nonefile_count=Noneifstorage_block:storage=awaitBlock.load(storage_block)if"put-directory"notinstorage.get_block_capabilities():raiseBlockMissingCapabilities(f"Storage block {storage!r} missing 'put-directory' capability.")self.storage=storage# upload current directory to storage locationfile_count=awaitself.storage.put_directory(ignore_file=ignore_file,to_path=self.path)elifself.storage:if"put-directory"notinself.storage.get_block_capabilities():raiseBlockMissingCapabilities(f"Storage block {self.storage!r} missing 'put-directory'"" capability.")file_count=awaitself.storage.put_directory(ignore_file=ignore_file,to_path=self.path)# persists storage now in case it contains secret valuesifself.storageandnotself.storage._block_document_id:awaitself.storage._save(is_anonymous=True)returnfile_count@sync_compatibleasyncdefapply(self,upload:bool=False,work_queue_concurrency:int=None)->UUID:""" Registers this deployment with the API and returns the deployment's ID. Args: upload: if True, deployment files are automatically uploaded to remote storage work_queue_concurrency: If provided, sets the concurrency limit on the deployment's work queue """ifnotself.nameornotself.flow_name:raiseValueError("Both a deployment name and flow name must be set.")asyncwithget_client()asclient:# prep IDsflow_id=awaitclient.create_flow_from_name(self.flow_name)infrastructure_document_id=self.infrastructure._block_document_idifnotinfrastructure_document_id:# if not building off a block, will create an anonymous blockself.infrastructure=self.infrastructure.copy()infrastructure_document_id=awaitself.infrastructure._save(is_anonymous=True,)ifupload:awaitself.upload_to_storage()ifself.work_queue_nameandwork_queue_concurrencyisnotNone:try:res=awaitclient.create_work_queue(name=self.work_queue_name,work_pool_name=self.work_pool_name)exceptObjectAlreadyExists:res=awaitclient.read_work_queue_by_name(name=self.work_queue_name,work_pool_name=self.work_pool_name)awaitclient.update_work_queue(res.id,concurrency_limit=work_queue_concurrency)# we assume storage was already savedstorage_document_id=getattr(self.storage,"_block_document_id",None)deployment_id=awaitclient.create_deployment(flow_id=flow_id,name=self.name,work_queue_name=self.work_queue_name,work_pool_name=self.work_pool_name,version=self.version,schedule=self.schedule,is_schedule_active=self.is_schedule_active,parameters=self.parameters,description=self.description,tags=self.tags,manifest_path=self.manifest_path,# allows for backwards YAML compatpath=self.path,entrypoint=self.entrypoint,infra_overrides=self.infra_overrides,storage_document_id=storage_document_id,infrastructure_document_id=infrastructure_document_id,parameter_openapi_schema=self.parameter_openapi_schema.dict(),)returndeployment_id@classmethod@sync_compatibleasyncdefbuild_from_flow(cls,flow:Flow,name:str,output:str=None,skip_upload:bool=False,ignore_file:str=".prefectignore",apply:bool=False,load_existing:bool=True,**kwargs,)->"Deployment":""" Configure a deployment for a given flow. Args: flow: A flow function to deploy name: A name for the deployment output (optional): if provided, the full deployment specification will be written as a YAML file in the location specified by `output` skip_upload: if True, deployment files are not automatically uploaded to remote storage ignore_file: an optional path to a `.prefectignore` file that specifies filename patterns to ignore when uploading to remote storage; if not provided, looks for `.prefectignore` in the current working directory apply: if True, the deployment is automatically registered with the API load_existing: if True, load any settings that may already be configured for the named deployment server-side (e.g., schedules, default parameter values, etc.) **kwargs: other keyword arguments to pass to the constructor for the `Deployment` class """ifnotname:raiseValueError("A deployment name must be provided.")# note that `deployment.load` only updates settings that were *not*# provided at initializationdeployment=cls(name=name,**kwargs)deployment.flow_name=flow.nameifnotdeployment.entrypoint:## first see if an entrypoint can be determinedflow_file=getattr(flow,"__globals__",{}).get("__file__")mod_name=getattr(flow,"__module__",None)ifnotflow_file:ifnotmod_name:# todo, check if the file location was manually set alreadyraiseValueError("Could not determine flow's file location.")module=importlib.import_module(mod_name)flow_file=getattr(module,"__file__",None)ifnotflow_file:raiseValueError("Could not determine flow's file location.")# set entrypointentry_path=Path(flow_file).absolute().relative_to(Path(".").absolute())deployment.entrypoint=f"{entry_path}:{flow.fn.__name__}"ifload_existing:awaitdeployment.load()# set a few attributes for this flow objectdeployment.parameter_openapi_schema=parameter_schema(flow)ifnotdeployment.version:deployment.version=flow.versionifnotdeployment.description:deployment.description=flow.description# proxy for whether infra is docker-basedis_docker_based=hasattr(deployment.infrastructure,"image")ifnotdeployment.storageandnotis_docker_basedandnotdeployment.path:deployment.path=str(Path(".").absolute())elifnotdeployment.storageandis_docker_based:# only update if a path is not already setifnotdeployment.path:deployment.path="/opt/prefect/flows"ifnotskip_upload:if(deployment.storageand"put-directory"indeployment.storage.get_block_capabilities()):awaitdeployment.upload_to_storage(ignore_file=ignore_file)ifoutput:awaitdeployment.to_yaml(output)ifapply:awaitdeployment.apply()returndeployment
@sync_compatibleasyncdefapply(self,upload:bool=False,work_queue_concurrency:int=None)->UUID:""" Registers this deployment with the API and returns the deployment's ID. Args: upload: if True, deployment files are automatically uploaded to remote storage work_queue_concurrency: If provided, sets the concurrency limit on the deployment's work queue """ifnotself.nameornotself.flow_name:raiseValueError("Both a deployment name and flow name must be set.")asyncwithget_client()asclient:# prep IDsflow_id=awaitclient.create_flow_from_name(self.flow_name)infrastructure_document_id=self.infrastructure._block_document_idifnotinfrastructure_document_id:# if not building off a block, will create an anonymous blockself.infrastructure=self.infrastructure.copy()infrastructure_document_id=awaitself.infrastructure._save(is_anonymous=True,)ifupload:awaitself.upload_to_storage()ifself.work_queue_nameandwork_queue_concurrencyisnotNone:try:res=awaitclient.create_work_queue(name=self.work_queue_name,work_pool_name=self.work_pool_name)exceptObjectAlreadyExists:res=awaitclient.read_work_queue_by_name(name=self.work_queue_name,work_pool_name=self.work_pool_name)awaitclient.update_work_queue(res.id,concurrency_limit=work_queue_concurrency)# we assume storage was already savedstorage_document_id=getattr(self.storage,"_block_document_id",None)deployment_id=awaitclient.create_deployment(flow_id=flow_id,name=self.name,work_queue_name=self.work_queue_name,work_pool_name=self.work_pool_name,version=self.version,schedule=self.schedule,is_schedule_active=self.is_schedule_active,parameters=self.parameters,description=self.description,tags=self.tags,manifest_path=self.manifest_path,# allows for backwards YAML compatpath=self.path,entrypoint=self.entrypoint,infra_overrides=self.infra_overrides,storage_document_id=storage_document_id,infrastructure_document_id=infrastructure_document_id,parameter_openapi_schema=self.parameter_openapi_schema.dict(),)returndeployment_id
if provided, the full deployment specification will be written as a YAML
file in the location specified by output
None
skip_upload
bool
if True, deployment files are not automatically uploaded to remote storage
False
ignore_file
str
an optional path to a .prefectignore file that specifies filename patterns
to ignore when uploading to remote storage; if not provided, looks for .prefectignore
in the current working directory
'.prefectignore'
apply
bool
if True, the deployment is automatically registered with the API
False
load_existing
bool
if True, load any settings that may already be configured for the named deployment
server-side (e.g., schedules, default parameter values, etc.)
True
**kwargs
other keyword arguments to pass to the constructor for the Deployment class
@classmethod@sync_compatibleasyncdefbuild_from_flow(cls,flow:Flow,name:str,output:str=None,skip_upload:bool=False,ignore_file:str=".prefectignore",apply:bool=False,load_existing:bool=True,**kwargs,)->"Deployment":""" Configure a deployment for a given flow. Args: flow: A flow function to deploy name: A name for the deployment output (optional): if provided, the full deployment specification will be written as a YAML file in the location specified by `output` skip_upload: if True, deployment files are not automatically uploaded to remote storage ignore_file: an optional path to a `.prefectignore` file that specifies filename patterns to ignore when uploading to remote storage; if not provided, looks for `.prefectignore` in the current working directory apply: if True, the deployment is automatically registered with the API load_existing: if True, load any settings that may already be configured for the named deployment server-side (e.g., schedules, default parameter values, etc.) **kwargs: other keyword arguments to pass to the constructor for the `Deployment` class """ifnotname:raiseValueError("A deployment name must be provided.")# note that `deployment.load` only updates settings that were *not*# provided at initializationdeployment=cls(name=name,**kwargs)deployment.flow_name=flow.nameifnotdeployment.entrypoint:## first see if an entrypoint can be determinedflow_file=getattr(flow,"__globals__",{}).get("__file__")mod_name=getattr(flow,"__module__",None)ifnotflow_file:ifnotmod_name:# todo, check if the file location was manually set alreadyraiseValueError("Could not determine flow's file location.")module=importlib.import_module(mod_name)flow_file=getattr(module,"__file__",None)ifnotflow_file:raiseValueError("Could not determine flow's file location.")# set entrypointentry_path=Path(flow_file).absolute().relative_to(Path(".").absolute())deployment.entrypoint=f"{entry_path}:{flow.fn.__name__}"ifload_existing:awaitdeployment.load()# set a few attributes for this flow objectdeployment.parameter_openapi_schema=parameter_schema(flow)ifnotdeployment.version:deployment.version=flow.versionifnotdeployment.description:deployment.description=flow.description# proxy for whether infra is docker-basedis_docker_based=hasattr(deployment.infrastructure,"image")ifnotdeployment.storageandnotis_docker_basedandnotdeployment.path:deployment.path=str(Path(".").absolute())elifnotdeployment.storageandis_docker_based:# only update if a path is not already setifnotdeployment.path:deployment.path="/opt/prefect/flows"ifnotskip_upload:if(deployment.storageand"put-directory"indeployment.storage.get_block_capabilities()):awaitdeployment.upload_to_storage(ignore_file=ignore_file)ifoutput:awaitdeployment.to_yaml(output)ifapply:awaitdeployment.apply()returndeployment
This method ensures setting a value of None is handled gracefully.
Source code in src/prefect/deployments.py
490491492493494495496497
@validator("parameter_openapi_schema",pre=True)defhandle_openapi_schema(cls,value):""" This method ensures setting a value of `None` is handled gracefully. """ifvalueisNone:returnParameterSchema()returnvalue
@sync_compatibleasyncdefload(self)->bool:""" Queries the API for a deployment with this name for this flow, and if found, prepopulates any settings that were not set at initialization. Returns a boolean specifying whether a load was successful or not. Raises: - ValueError: if both name and flow name are not set """ifnotself.nameornotself.flow_name:raiseValueError("Both a deployment name and flow name must be provided.")asyncwithget_client()asclient:try:deployment=awaitclient.read_deployment_by_name(f"{self.flow_name}/{self.name}")ifdeployment.storage_document_id:storage=Block._from_block_document(awaitclient.read_block_document(deployment.storage_document_id))excluded_fields=self.__fields_set__.union({"infrastructure","storage","timestamp"})forfieldinset(self.__fields__.keys())-excluded_fields:new_value=getattr(deployment,field)setattr(self,field,new_value)if"infrastructure"notinself.__fields_set__:ifdeployment.infrastructure_document_id:self.infrastructure=Block._from_block_document(awaitclient.read_block_document(deployment.infrastructure_document_id))if"storage"notinself.__fields_set__:ifdeployment.storage_document_id:self.storage=Block._from_block_document(awaitclient.read_block_document(deployment.storage_document_id))exceptObjectNotFound:returnFalsereturnTrue
@sync_compatibleasyncdefupdate(self,ignore_none:bool=False,**kwargs):""" Performs an in-place update with the provided settings. Args: ignore_none: if True, all `None` values are ignored when performing the update """unknown_keys=set(kwargs.keys())-set(self.dict().keys())ifunknown_keys:raiseValueError(f"Received unexpected attributes: {', '.join(unknown_keys)}")forkey,valueinkwargs.items():ifignore_noneandvalueisNone:continuesetattr(self,key,value)
Uploads the workflow this deployment represents using a provided storage block;
if no block is provided, defaults to configuring self for local storage.
Parameters:
Name
Type
Description
Default
storage_block
str
a string reference a remote storage block slug $type/$name; if provided,
used to upload the workflow's project
None
ignore_file
str
an optional path to a .prefectignore file that specifies filename patterns
to ignore when uploading to remote storage; if not provided, looks for .prefectignore
in the current working directory
@sync_compatibleasyncdefupload_to_storage(self,storage_block:str=None,ignore_file:str=".prefectignore")->Optional[int]:""" Uploads the workflow this deployment represents using a provided storage block; if no block is provided, defaults to configuring self for local storage. Args: storage_block: a string reference a remote storage block slug `$type/$name`; if provided, used to upload the workflow's project ignore_file: an optional path to a `.prefectignore` file that specifies filename patterns to ignore when uploading to remote storage; if not provided, looks for `.prefectignore` in the current working directory """deployment_path=Nonefile_count=Noneifstorage_block:storage=awaitBlock.load(storage_block)if"put-directory"notinstorage.get_block_capabilities():raiseBlockMissingCapabilities(f"Storage block {storage!r} missing 'put-directory' capability.")self.storage=storage# upload current directory to storage locationfile_count=awaitself.storage.put_directory(ignore_file=ignore_file,to_path=self.path)elifself.storage:if"put-directory"notinself.storage.get_block_capabilities():raiseBlockMissingCapabilities(f"Storage block {self.storage!r} missing 'put-directory'"" capability.")file_count=awaitself.storage.put_directory(ignore_file=ignore_file,to_path=self.path)# persists storage now in case it contains secret valuesifself.storageandnotself.storage._block_document_id:awaitself.storage._save(is_anonymous=True)returnfile_count
defload_deployments_from_yaml(path:str,)->PrefectObjectRegistry:""" Load deployments from a yaml file. """withopen(path,"r")asf:contents=f.read()# Parse into a yaml tree to retrieve separate documentsnodes=yaml.compose_all(contents)withPrefectObjectRegistry(capture_failures=True)asregistry:fornodeinnodes:withtmpchdir(path):deployment_dict=yaml.safe_load(yaml.serialize(node))# The return value is not necessary, just instantiating the Deployment# is enough to get it recorded on the registryparse_obj_as(Deployment,deployment_dict)returnregistry
Load a flow from the location/script provided in a deployment's storage document.
If ignore_storage=True is provided, no pull from remote storage occurs. This flag
is largely for testing, and assumes the flow is already available locally.
@inject_clientasyncdefload_flow_from_flow_run(flow_run:schemas.core.FlowRun,client:PrefectClient,ignore_storage:bool=False)->Flow:""" Load a flow from the location/script provided in a deployment's storage document. If `ignore_storage=True` is provided, no pull from remote storage occurs. This flag is largely for testing, and assumes the flow is already available locally. """deployment=awaitclient.read_deployment(flow_run.deployment_id)logger=flow_run_logger(flow_run)ifnotignore_storage:ifdeployment.storage_document_id:storage_document=awaitclient.read_block_document(deployment.storage_document_id)storage_block=Block._from_block_document(storage_document)else:basepath=deployment.pathorPath(deployment.manifest_path).parentstorage_block=LocalFileSystem(basepath=basepath)sys.path.insert(0,".")logger.info(f"Downloading flow code from storage at {deployment.path!r}")awaitstorage_block.get_directory(from_path=deployment.path,local_path=".")import_path=relative_path_to_current_platform(deployment.entrypoint)logger.debug(f"Importing flow code from '{import_path}'")# for backwards compatifdeployment.manifest_path:withopen(deployment.manifest_path,"r")asf:import_path=json.load(f)["import_path"]import_path=(Path(deployment.manifest_path).parent/import_path).absolute()flow=awaitrun_sync_in_worker_thread(load_flow_from_entrypoint,str(import_path))returnflow
Create a flow run for a deployment and return it after completion or a timeout.
This function will return when the created flow run enters any terminal state or
the timeout is reached. If the timeout is reached and the flow run has not reached
a terminal state, it will still be returned. When using a timeout, we suggest
checking the state of the flow run if completion is important moving forward.
Parameters:
Name
Type
Description
Default
name
Union[str, UUID]
The deployment id or deployment name in the form: '/'
required
parameters
Optional[dict]
Parameter overrides for this flow run. Merged with the deployment
defaults.
None
scheduled_time
Optional[datetime]
The time to schedule the flow run for, defaults to scheduling
the flow run to start now.
None
flow_run_name
Optional[str]
A name for the created flow run
None
timeout
Optional[float]
The amount of time to wait for the flow run to complete before
returning. Setting timeout to 0 will return the flow run immediately.
Setting timeout to None will allow this function to poll indefinitely.
Defaults to None
None
poll_interval
Optional[float]
The number of seconds between polls
5
tags
Optional[Iterable[str]]
A list of tags to associate with this flow run; note that tags are used only for organizational purposes.
None
idempotency_key
Optional[str]
A unique value to recognize retries of the same run, and prevent creating multiple flow runs.
@sync_compatible@inject_clientasyncdefrun_deployment(name:Union[str,UUID],client:Optional[PrefectClient]=None,parameters:Optional[dict]=None,scheduled_time:Optional[datetime]=None,flow_run_name:Optional[str]=None,timeout:Optional[float]=None,poll_interval:Optional[float]=5,tags:Optional[Iterable[str]]=None,idempotency_key:Optional[str]=None,):""" Create a flow run for a deployment and return it after completion or a timeout. This function will return when the created flow run enters any terminal state or the timeout is reached. If the timeout is reached and the flow run has not reached a terminal state, it will still be returned. When using a timeout, we suggest checking the state of the flow run if completion is important moving forward. Args: name: The deployment id or deployment name in the form: '<flow-name>/<deployment-name>' parameters: Parameter overrides for this flow run. Merged with the deployment defaults. scheduled_time: The time to schedule the flow run for, defaults to scheduling the flow run to start now. flow_run_name: A name for the created flow run timeout: The amount of time to wait for the flow run to complete before returning. Setting `timeout` to 0 will return the flow run immediately. Setting `timeout` to None will allow this function to poll indefinitely. Defaults to None poll_interval: The number of seconds between polls tags: A list of tags to associate with this flow run; note that tags are used only for organizational purposes. idempotency_key: A unique value to recognize retries of the same run, and prevent creating multiple flow runs. """iftimeoutisnotNoneandtimeout<0:raiseValueError("`timeout` cannot be negative")ifscheduled_timeisNone:scheduled_time=pendulum.now("UTC")parameters=parametersor{}deployment_id=Noneifisinstance(name,UUID):deployment_id=nameelse:try:deployment_id=UUID(name)exceptValueError:passifdeployment_id:deployment=awaitclient.read_deployment(deployment_id=deployment_id)else:deployment=awaitclient.read_deployment_by_name(name)flow_run_ctx=FlowRunContext.get()ifflow_run_ctx:# This was called from a flow. Link the flow run as a subflow.fromprefect.engineimport(Pending,_dynamic_key_for_task_run,collect_task_run_inputs,)task_inputs={k:awaitcollect_task_run_inputs(v)fork,vinparameters.items()}ifdeployment_id:flow=awaitclient.read_flow(deployment.flow_id)deployment_name=f"{flow.name}/{deployment.name}"else:deployment_name=name# Generate a task in the parent flow run to represent the result of the subflowdummy_task=Task(name=deployment_name,fn=lambda:None,version=deployment.version,)# Override the default task key to include the deployment namedummy_task.task_key=f"{__name__}.run_deployment.{slugify(deployment_name)}"parent_task_run=awaitclient.create_task_run(task=dummy_task,flow_run_id=flow_run_ctx.flow_run.id,dynamic_key=_dynamic_key_for_task_run(flow_run_ctx,dummy_task),task_inputs=task_inputs,state=Pending(),)parent_task_run_id=parent_task_run.idelse:parent_task_run_id=Noneflow_run=awaitclient.create_flow_run_from_deployment(deployment.id,parameters=parameters,state=Scheduled(scheduled_time=scheduled_time),name=flow_run_name,tags=tags,idempotency_key=idempotency_key,parent_task_run_id=parent_task_run_id,)flow_run_id=flow_run.idiftimeout==0:returnflow_runwithanyio.move_on_after(timeout):whileTrue:flow_run=awaitclient.read_flow_run(flow_run_id)flow_state=flow_run.stateifflow_stateandflow_state.is_final():returnflow_runawaitanyio.sleep(poll_interval)returnflow_run