Source code for ecl.image.v2.image_copy

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from ecl.image import image_service
from ecl import resource2

[docs]class ImageCopy(resource2.Resource): service = image_service.ImageService() base_path ='/' + service.version + '/extension/image_replicator/jobs' # Capabilities allow_create = True # Properties #: An identifier job's id for the image copying. job_id = resource2.Body('job_id') #: source image_id source_image_id = resource2.Body('source_image_id') #: source region_id source_region_id = resource2.Body('source_region_id') #: source tenant_id source_tenant_id = resource2.Body('source_tenant_id') #: destination image_id destination_image_id = resource2.Body('destination_image_id') #: destination region_id destination_region_id = resource2.Body('destination_region_id') #: destination tenant_id destination_tenant_id = resource2.Body('destination_tenant_id') #: job status status = resource2.Body('status') #: progress of the image copying copy_progress = resource2.Body('copy_progress')
[docs] def copy_image(self, session, image_id, tenant_id_dst): """Copy image to a specified region.""" uri = self.base_path resp = session.post( uri, headers={"Content-Type": "application/json"}, endpoint_filter = self.service, json={ "image_id":image_id, "tenant_id_dst":tenant_id_dst } ) self._translate_response(resp, has_body=True) return self
[docs] def cancel_copy_image(self, session, job_id): """Cancel a specified image copy job.""" uri = self.base_path + '/' + str(job_id) resp = session.delete(uri, endpoint_filter = self.service) self._translate_response(resp, has_body=False) return self
[docs] def list_image_copy_jobs(self, session): """Lists details for image copy jobs.""" uri = self.base_path resp = session.get(uri, endpoint_filter = self.service) resp = resp.json() for data in resp: value = self.existing(**data) yield value
[docs] def get_image_copy_job(self, session, job_id): """Get details for a specified image copy job.""" uri = self.base_path + '/' + str(job_id) resp = session.get(uri, endpoint_filter = self.service) self._translate_response(resp, has_body=True) return self