Label Studio SDK 项目模块
本节包含您可以使用SDK执行的项目操作。如需执行其他操作,请参阅client、data manager或utils模块。
source code 浏览Git
""" .. include::../docs/project.md
"""
import json
import logging
import os
import pathlib
import time
from enum import Enum, auto
from pathlib import Path
from random import sample, shuffle
from typing import Optional, Union, List, Dict, Callable
from label_studio_tools.core.label_config import parse_config
from label_studio_tools.core.utils.io import get_local_path
from requests import Response
from requests.exceptions import HTTPError, InvalidSchema, MissingSchema
from .client import Client
from .utils import parse_config, chunk
logger = logging.getLogger(__name__)
class LabelStudioException(Exception):
pass
class LabelStudioAttributeError(LabelStudioException):
pass
class ProjectSampling(Enum):
"""Enumerate the available task sampling modes for labeling."""
RANDOM = "Uniform sampling"
""" Uniform random sampling of tasks """
SEQUENCE = "Sequential sampling"
""" Sequential sampling of tasks using task IDs """
UNCERTAINTY = "Uncertainty sampling"
""" Sample tasks based on prediction scores, such as for active learning (Enterprise only)"""
class ProjectStorage(Enum):
"""Enumerate the available types of external source and target storage for labeling projects."""
GOOGLE = "gcs"
""" Google Cloud Storage """
S3 = "s3"
""" Amazon S3 Storage """
AZURE = "azure_blob"
""" Microsoft Azure Blob Storage """
LOCAL = "localfiles"
""" Label Studio Local File Storage """
REDIS = "redis"
""" Redis Storage """
S3_SECURED = "s3s"
""" Amazon S3 Storage secured by IAM roles (Enterprise only) """
class AssignmentSamplingMethod(Enum):
RANDOM = auto() # produces uniform splits across annotators
class ExportSnapshotStatus:
CREATED = "created"
""" Export snapshot is created """
IN_PROGRESS = "in_progress"
""" Export snapshot is in progress """
FAILED = "failed"
""" Export snapshot failed with errors """
COMPLETED = "completed"
""" Export snapshot was created and can be downloaded """
def __init__(self, response):
self.response = response
def is_created(self):
"""Export snapshot is created"""
assert (
"status" in self.response
), '"status" field not found in export snapshot status response'
return self.response["status"] == self.CREATED
def is_in_progress(self):
"""Export snapshot is in progress"""
assert (
"status" in self.response
), '"status" field not found in export_snapshot_status response'
return self.response["status"] == self.IN_PROGRESS
def is_failed(self):
"""Export snapshot failed with errors"""
assert (
"status" in self.response
), '"status" field not found in export_snapshot_status response'
return self.response["status"] == self.FAILED
def is_completed(self):
"""Export snapshot was created and can be downloaded"""
assert (
"status" in self.response
), '"status" field not found in export_snapshot_status response'
return self.response["status"] == self.COMPLETED
class Project(Client):
def __init__(self, *args, **kwargs):
"""Initialize project class.
Parameters
----------
"""
super(Project, self).__init__(*args, **kwargs)
self.params = {}
def __getattr__(self, item):
return self._get_param(item)
@property
def parsed_label_config(self):
"""Get the parsed labeling configuration for the project. You can use this to more easily construct
annotation or prediction results based on your labeling configuration.
Returns
-------
dict
Object and control tags from the project labeling configuration.
Example with structured configuration of the form:
```
{
"<ControlTag>.name": {
"type": "ControlTag",
"to_name": ["<ObjectTag1>.name", "<ObjectTag2>.name"],
"inputs: [
{"type": "ObjectTag1", "value": "<ObjectTag1>.value"},
{"type": "ObjectTag2", "value": "<ObjectTag2>.value"}
],
"labels": ["Label1", "Label2", "Label3"]
}
```
`"labels"` are taken from "alias" attribute if it exists, else "value"
"""
return parse_config(self.label_config)
def get_members(self):
"""Get members from this project.
Parameters
----------
Returns
-------
list of `label_studio_sdk.users.User`
"""
from .users import User
assert self.is_enterprise, (
"Project members are available in the Enterprise edition of Label Studio only. "
"Use get_users() instead."
)
response = self.make_request("GET", f"/api/projects/{self.id}/members")
users = []
for user_data in response.json():
user_data["client"] = self
users.append(User(**user_data))
return users
def add_member(self, user):
"""Add a user to a project.
Parameters
----------
user: User
Returns
-------
dict
Dict with created member
"""
payload = {"user": user.id}
response = self.make_request(
"POST", f"/api/projects/{self.id}/members", json=payload
)
return response.json()
def assign_annotators(self, users, tasks_ids):
"""Assign annotators to tasks
Parameters
----------
users: list of user's objects
tasks_ids: list of integer task IDs to assign users to
Returns
-------
dict
Dict with counter of created assignments
"""
final_response = {"assignments": 0}
users_ids = [user.id for user in users]
# Assign tasks to users with batches
for c in chunk(tasks_ids, 1000):
logger.debug(f"Starting assignment for: {users_ids}")
payload = {
"users": users_ids,
"selectedItems": {"all": False, "included": c},
"type": "AN",
}
response = self.make_request(
"POST", f"/api/projects/{self.id}/tasks/assignees", json=payload
)
final_response["assignments"] += response.json()["assignments"]
return final_response
def delete_annotators_assignment(self, tasks_ids):
"""Remove all assigned annotators for tasks
Parameters
----------
tasks_ids: list of int
Returns
-------
dict
Dict with counter of deleted annotator assignments
"""
payload = {"selectedItems": {"all": False, "included": tasks_ids}}
response = self.make_request(
"POST",
f"/api/dm/actions?id=delete_annotators&project={self.id}",
json=payload,
)
return response.json()
def delete_reviewers_assignment(self, tasks_ids):
"""Clear all assigned reviewers for tasks
Parameters
----------
tasks_ids: list of int
Returns
-------
dict
Dict with counter of deleted reviewer assignments
"""
payload = {"selectedItems": {"all": False, "included": tasks_ids}}
response = self.make_request(
"POST",
f"/api/dm/actions?id=delete_reviewers&project={self.id}",
json=payload,
)
return response.json()
def assign_reviewers(self, users, tasks_ids):
"""Assign reviewers to tasks
Parameters
----------
users: list of user's objects
tasks_ids: list of integer task IDs to assign reviewers to
Returns
-------
dict
Dict with counter of created assignments
"""
payload = {
"users": [user.id for user in users],
"selectedItems": {"all": False, "included": tasks_ids},
"type": "RE",
}
response = self.make_request(
"POST", f"/api/projects/{self.id}/tasks/assignees", json=payload
)
return response.json()
def _get_param(self, param_name):
if param_name not in self.params:
self.update_params()
if param_name not in self.params:
raise LabelStudioAttributeError(
f'Project "{param_name}" field is not set'
)
return self.params[param_name]
def get_params(self):
"""Get all available project parameters.
Returns
--------
dict
containing all following params:
title: str
Project name.
description: str
Project description
label_config: str
Label config in XML format.
expert_instruction: str
Labeling instructions in HTML format
show_instruction: bool
Whether to display instructions to annotators before they start
show_skip_button: bool
Whether to show a skip button in the Label Studio UI and let annotators skip the task
enable_empty_annotation: bool
Allow annotators to submit empty annotations
show_annotation_history: bool
Show annotation history to annotator
organization: int
Organization ID
color: str
Color to decorate the project card in the Label Studio UI
maximum_annotations: int
Maximum number of annotations for one task. If the number of annotations per task is equal or greater
to this value, the task is finished and is_labeled=True is set. (Enterprise only)
is_published: bool
Whether or not the project is published to annotators (Enterprise only)
model_version: str
Machine learning model version for predictions or pre-annotations
is_draft: bool
Whether or not the project is in the middle of being created (Enterprise only)
created_by: object
Details about the user that created the project
min_annotations_to_start_training: int
Minimum number of completed tasks after which model training is started
show_collab_predictions: bool
Whether to show model predictions to the annotator, allowing them to collaborate with the ML model
sampling: str
Type of sampling to use for task labeling. Uncertainty sampling is Enterprise only.
Enum: "Sequential sampling" "Uniform sampling" "Uncertainty sampling"
show_ground_truth_first: bool
Whether to show tasks with ground truth annotations first (Enterprise only)
show_overlap_first: bool
Whether to show tasks with overlap first (Enterprise only)
overlap_cohort_percentage: int
Percentage of tasks that must be annotated multiple times. (Enterprise only)
task_data_login: str
User credentials for accessing task data. (Enterprise only)
task_data_password: str
Password credentials for accessing task data. (Enterprise only)
control_weights: object
Weights for control tags used when calculating agreement metrics. (Enterprise only)
evaluate_predictions_automatically: bool
Retrieve and display predictions when loading a task
"""
response = self.make_request("GET", f"/api/projects/{self.id}")
return response.json()
def get_model_versions(self):
"""Get the list of available ML model versions from pre-annotations or connected ML backends.
Returns
-------
list of strings
Model versions
"""
response = self.make_request("GET", f"/api/projects/{self.id}/model-versions")
return response.json()
def update_params(self):
"""Get [all available project parameters](#label_studio_sdk.project.Project.get_params) and cache them."""
self.params = self.get_params()
def start_project(self, **kwargs):
"""Create a new labeling project in Label Studio.
Parameters
----------
title: str
Project name.
description: str
Project description
label_config: str
Label config in XML format.
expert_instruction: str
Labeling instructions in HTML format
show_instruction: bool
Whether to display instructions to annotators before they start
show_skip_button: bool
Whether to show a skip button in the Label Studio UI and let annotators skip the task
enable_empty_annotation: bool
Allow annotators to submit empty annotations
show_annotation_history: bool
Show annotation history to annotator
organization: int
Organization ID
color: str
Color to decorate the project card in the Label Studio UI
maximum_annotations: int
Maximum number of annotations for one task. If the number of annotations per task is equal or greater
to this value, the task is finished and is_labeled=True is set. (Enterprise only)
is_published: bool
Whether or not the project is published to annotators (Enterprise only)
model_version: str
Machine learning model version for predictions or pre-annotations
is_draft: bool
Whether or not the project is in the middle of being created (Enterprise only)
created_by: object
Details about the user that created the project
min_annotations_to_start_training: int
Minimum number of completed tasks after which model training is started
show_collab_predictions: bool
Whether to show model predictions to the annotator, allowing them to collaborate with the ML model
sampling: str
Type of sampling to use for task labeling. Uncertainty sampling is Enterprise only.
Enum: "Sequential sampling" "Uniform sampling" "Uncertainty sampling"
show_ground_truth_first: bool
Whether to show tasks with ground truth annotations first (Enterprise only)
show_overlap_first: bool
Whether to show tasks with overlap first (Enterprise only)
overlap_cohort_percentage: int
Percentage of tasks that must be annotated multiple times. (Enterprise only)
task_data_login: str
User credentials for accessing task data. (Enterprise only)
task_data_password: str
Password credentials for accessing task data. (Enterprise only)
control_weights: object
Weights for control tags used when calculating agreement metrics. (Enterprise only)
evaluate_predictions_automatically: bool
Retrieve and display predictions when loading a task
Raises LabelStudioException in case of errors.
"""
response = self.make_request("POST", "/api/projects", json=kwargs)
if response.status_code == 201:
self.params = response.json()
else:
raise LabelStudioException("Project not created")
@classmethod
def _create_from_id(cls, client, project_id, params=None):
project = cls(
url=client.url,
api_key=client.api_key,
session=client.session,
extra_headers=client.headers,
versions=client.versions,
make_request_raise=client.make_request_raise,
)
if params and isinstance(params, dict):
# TODO: validate project parameters
project.params = params
project.params["id"] = project_id
return project
@classmethod
def get_from_id(cls, client, project_id) -> "Project":
"""Class factory to create a project instance from an existing project ID.
Parameters
----------
client: class Client
project_id: int
Project ID
Returns
-------
`Project`
"""
project = cls._create_from_id(client, project_id)
project.update_params()
return project
def import_tasks(self, tasks, preannotated_from_fields: List = None):
"""Import JSON-formatted labeling tasks. Tasks can be unlabeled or contain predictions.
Parameters
----------
tasks: list of dicts | dict | path to file
Tasks in <a href="https://labelstud.io/guide/tasks.html#Basic-Label-Studio-JSON-format">
Label Studio JSON format</a>
preannotated_from_fields: list of strings
Turns flat task JSON formatted like: `{"column1": value, "column2": value}` into Label Studio prediction
data format: `{"data": {"column1"..}, "predictions": [{..."column2"}]`
Useful when all your data is stored in tabular format with one column dedicated to model predictions.
Returns
-------
list of int
Imported task IDs
"""
params = {"return_task_ids": "1"}
if preannotated_from_fields:
params["preannotated_from_fields"] = ",".join(preannotated_from_fields)
if isinstance(tasks, (list, dict)):
response = self.make_request(
method="POST",
url=f"/api/projects/{self.id}/import",
json=tasks,
params=params,
timeout=(10, 600),
)
elif isinstance(tasks, (str, Path)):
# try import from file
if not os.path.isfile(tasks):
raise LabelStudioException(f"Not found import tasks file {tasks}")
with open(tasks, mode="rb") as f:
response = self.make_request(
method="POST",
url=f"/api/projects/{self.id}/import",
files={"file": f},
params=params,
timeout=(10, 600),
)
else:
raise TypeError(
f'Not supported type provided as "tasks" argument: {type(tasks)}'
)
response = response.json()
if "import" in response:
# check import status
timeout = 300
fibonacci_backoff = [1, 1]
start_time = time.time()
while True:
import_status = self.make_request(
method="GET",
url=f'/api/projects/{self.id}/imports/{response["import"]}',
).json()
if import_status["status"] == "completed":
return import_status["task_ids"]
if import_status["status"] == "failed":
raise LabelStudioException(import_status["error"])
if time.time() - start_time >= timeout:
raise LabelStudioException("Import timeout")
time.sleep(fibonacci_backoff[0])
fibonacci_backoff = [
fibonacci_backoff[1],
fibonacci_backoff[0] + fibonacci_backoff[1],
]
return response["task_ids"]
def export_tasks(
self,
export_type: str = "JSON",
download_all_tasks: bool = False,
download_resources: bool = False,
ids: Optional[List[int]] = None,
export_location: Optional[str] = None,
) -> Union[list, pathlib.Path]:
"""Export annotated tasks.
Parameters
----------
export_type: string
Default export_type is JSON.
Specify another format type as referenced in <a href="https://github.com/heartexlabs/label-studio-converter/blob/master/label_studio_converter/converter.py#L32">
the Label Studio converter code</a>.
download_all_tasks: bool
Default download_all_tasks is False.
If true, download all tasks regardless of status. If false, download only annotated tasks.
download_resources: bool
Default download_resources is False.
If true, download all resource files such as images, audio, and others relevant to the tasks.
ids: list of ints
Optional, specify a list of task IDs to retrieve only the details for those tasks.
export_location: str or path
Optional, specify a location to save the export to, this is mandatory for the YOLO export.
A pathlib.Path object will be returned instead of the deserialized json.
Returns
-------
list of dicts if export_location is None
Tasks with annotations
pathlib.Path if export_location is not None
Path to the export
"""
params = {
"exportType": export_type,
"download_all_tasks": download_all_tasks,
"download_resources": download_resources,
}
if ids:
params["ids"] = ids
response = self.make_request(
method="GET", url=f"/api/projects/{self.id}/export", params=params
)
if export_location is None:
if "JSON" not in export_type.upper():
raise ValueError(
f"{export_type} export type requires an export location to be specified"
)
return response.json()
export_path = pathlib.Path(export_location)
# ensure that parent location exists even if it is in some subdirectory
export_path.parent.mkdir(parents=True, exist_ok=True)
with open(export_path, "wb") as out_file:
for chunk in response.iter_content(
chunk_size=1024
): # 1 kib seems reasonable
out_file.write(chunk)
return export_path
def set_params(self, **kwargs):
"""Low level function to set project parameters."""
response = self.make_request("PATCH", f"/api/projects/{self.id}", json=kwargs)
assert response.status_code == 200
def set_sampling(self, sampling: ProjectSampling):
"""Set the project sampling method for the labeling stream."""
self.set_params(sampling=sampling.value)
def set_published(self, is_published: bool):
"""Set the project publication state. (Enterprise only)
Parameters
----------
is_published: bool
Project publication state for reviewers and annotators
"""
self.set_params(is_published=is_published)
def set_model_version(self, model_version: str):
"""Set the current model version to use for displaying predictions to annotators, perform uncertainty sampling
and annotation evaluations in Label Studio Enterprise, and other operations.
Parameters
----------
model_version: string
It can be any string you want
"""
self.set_params(model_version=model_version)
def get_tasks(
self,
filters=None,
ordering=None,
view_id=None,
selected_ids=None,
only_ids: bool = False,
):
"""Retrieve a subset of tasks from the Data Manager based on a filter, ordering mechanism, or a
predefined view ID.
Parameters
----------
filters: label_studio_sdk.data_manager.Filters.create()
JSON objects representing Data Manager filters. Use `label_studio_sdk.data_manager.Filters.create()`
helper to create it.
Example:
```json
{
"conjunction": "and",
"items": [
{
"filter": "filter:tasks:id",
"operator": "equal",
"type": "Number",
"value": 1
}
]
}
```
ordering: list of label_studio_sdk.data_manager.Column
List with <b>one</b> string representing Data Manager ordering.
Use `label_studio_sdk.data_manager.Column` helper class.
Example:
```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order
view_id: int
View ID, visible as a Data Manager tab, for which to retrieve filters, ordering, and selected items
selected_ids: list of ints
Task IDs
only_ids: bool
If true, return only task IDs
Returns
-------
list
Task list with task data, annotations, predictions and other fields from the Data Manager
"""
page = 1
result = []
data = {}
while not data.get("end_pagination"):
try:
data = self.get_paginated_tasks(
filters=filters,
ordering=ordering,
view_id=view_id,
selected_ids=selected_ids,
only_ids=only_ids,
page=page,
page_size=100,
)
result += data["tasks"]
page += 1
except LabelStudioException as e:
logger.debug(f"Error during pagination: {e}")
break
return result
def get_paginated_tasks(
self,
filters=None,
ordering=None,
view_id=None,
selected_ids=None,
page: int = 1,
page_size: int = 100,
only_ids: bool = False,
resolve_uri: bool = True,
):
"""Retrieve a subset of tasks from the Data Manager based on a filter, ordering mechanism, or a
predefined view ID. For non-existent pages it returns 404 error.
Parameters
----------
filters: label_studio_sdk.data_manager.Filters.create()
JSON objects representing Data Manager filters. Use `label_studio_sdk.data_manager.Filters.create()`
helper to create it.
Example:
{
"conjunction": "and",
"items": [
{
"filter": "filter:tasks:id",
"operator": "equal",
"type": "Number",
"value": 1
}
]
}
ordering: list of label_studio_sdk.data_manager.Column
List with <b>one</b> string representing Data Manager ordering.
Use `label_studio_sdk.data_manager.Column` helper class.
Example:
```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order
view_id: int
View ID, visible as a Data Manager tab, for which to retrieve filters, ordering, and selected items
selected_ids: list of ints
Task IDs
page: int
Page. Default is 1.
page_size: int
Page size. Default is 100, to retrieve all tasks in the project you can use get_tasks().
only_ids: bool
If true, return only task IDs
resolve_uri: bool
Resolve pre-sign urls to https links
Returns
-------
dict
Example:
{
"tasks": [{...}],
"total_annotations": 50,
"total_predictions": 100,
"total": 100
}
tasks: list of dicts
Tasks with task data, annotations, predictions and other fields from the Data Manager
total: int
Total number of tasks in filtered result
total_annotations: int
Total number of annotations in filtered tasks
total_predictions: int
Total number of predictions in filtered tasks
"""
query = {
"filters": filters,
"ordering": ordering or [],
"selectedItems": (
{"all": False, "included": selected_ids}
if selected_ids
else {"all": True, "excluded": []}
),
}
params = {
"project": self.id,
"page": page,
"page_size": page_size,
"view": view_id,
"query": json.dumps(query),
"fields": "all",
"resolve_uri": resolve_uri,
}
if only_ids:
params["include"] = "id"
response = self.make_request(
"GET", "/api/tasks", params, raise_exceptions=False
)
# we'll get 404 from API on empty page
if response.status_code == 404:
return {"tasks": [], "end_pagination": True}
elif response.status_code != 200:
self.log_response_error(response)
try:
response.raise_for_status()
except HTTPError as e:
raise LabelStudioException(f"Error loading tasks: {e}")
data = response.json()
tasks = data["tasks"]
if only_ids:
data["tasks"] = [task["id"] for task in tasks]
return data
def get_tasks_ids(self, *args, **kwargs):
"""Same as `label_studio_sdk.project.Project.get_tasks()` but returns only task IDs."""
kwargs["only_ids"] = True
return self.get_tasks(*args, **kwargs)
def get_paginated_tasks_ids(self, *args, **kwargs):
"""Same as `label_studio_sdk.project.Project.get_paginated_tasks()` but returns
only task IDs.
"""
kwargs["only_ids"] = True
return self.get_paginated_tasks(*args, **kwargs)
def get_views(self):
"""Get all views related to the project
Returns
-------
list
List of view dicts
The each dict contains the following fields:
id: int
View ID
project: int
Project ID
user: int
User ID who created this tab
data: dict
Filters, orderings and other visual settings
"""
response = self.make_request("GET", f"/api/dm/views?project={self.id}")
return response.json()
def create_view(self, filters, ordering=None, title="Tasks"):
"""Create view
Parameters
----------
filters: dict
Specify the filters(`label_studio_sdk.data_manager.Filters`) of the view
ordering: list of label_studio_sdk.data_manager.Column
List with <b>one</b> string representing Data Manager ordering.
Use `label_studio_sdk.data_manager.Column` helper class.
Example:
```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order
title: str
Tab name
Returns
-------
dict:
dict with created view
"""
data = {
"project": self.id,
"data": {"title": title, "ordering": ordering, "filters": filters},
}
response = self.make_request("POST", "/api/dm/views", json=data)
return response.json()
def delete_view(self, view_id):
"""Delete view
Parameters
----------
view_id: int
View ID
Returns
-------
dict:
dict with deleted view
"""
response = self.make_request("DELETE", f"/api/dm/views/{view_id}")
return
@property
def tasks(self):
"""Retrieve all tasks from the project. This call can be very slow if the project has a lot of tasks."""
return self.get_tasks()
@property
def tasks_ids(self):
"""IDs for all tasks for a project. This call can be very slow if the project has lots of tasks."""
return self.get_tasks_ids()
def get_labeled_tasks(self, only_ids=False):
"""Retrieve all tasks that have been completed, i.e. where requested number of annotations have been created
Parameters
----------
only_ids: bool
Return only task IDs.
Returns
-------
list
List of task dicts, the same as in `get_tasks`.
"""
return self.get_tasks(
filters={
"conjunction": "and",
"items": [
{
"filter": "filter:tasks:completed_at",
"operator": "empty",
"value": False,
"type": "Datetime",
}
],
},
only_ids=only_ids,
)
def get_labeled_tasks_ids(self):
"""Retrieve all task IDs for completed tasks, i.e. where requested number of annotations have been created
Returns
-------
list
List of task IDs
"""
return self.get_labeled_tasks(only_ids=True)
def get_unlabeled_tasks(self, only_ids=False):
"""Retrieve all tasks that are <b>not</b> completed.
If using Label Studio Enterprise, this can include tasks that have been labeled one or more times, but not the full number of times defined in the
project labeling settings.
Parameters
----------
only_ids: bool
Return only task IDs
Returns
-------
list
List of task dicts, the same as in `get_tasks`.
"""
return self.get_tasks(
filters={
"conjunction": "and",
"items": [
{
"filter": "filter:tasks:completed_at",
"operator": "empty",
"value": True,
"type": "Datetime",
}
],
},
only_ids=only_ids,
)
def get_unlabeled_tasks_ids(self):
"""Retrieve all task IDs for tasks that are <b>not</b> completed. If using
Label Studio Enterprise, this can include tasks that have been labeled one or more times, but not the full
number of times defined in the project labeling settings.
Returns
-------
list
List of task IDs
"""
return self.get_unlabeled_tasks(only_ids=True)
def get_task(self, task_id):
"""Get specific task by ID.
Parameters
----------
task_id: int
Task ID you want to retrieve
Returns
-------
dict:
dict of task data containing all initial data and annotation results in [Label Studio JSON format](https://labelstud.io/guide/tasks.html#Basic-Label-Studio-JSON-format)
```
id: int
Task ID
predictions: dict
Predictions object
annotations: dict
Annotations object
drafts: dict
Drafts object
data: object
User imported or uploaded data for a task. Data is formatted according to the project label config.
meta: object
Meta is user imported (uploaded) data and can be useful as input for an ML Backend for embeddings, advanced vectors, and other info. It is passed to ML during training/predicting steps.
(Deprecated)
created_at: str
Date time string representing the time a task was created.
updated_at: str
Date time string representing the last time a task was updated.
is_labeled: bool
True if the number of annotations for this task is greater than or equal to the number of maximum_completions for the project.
overlap: int
Number of distinct annotators that processed the current task.
project: int
Project ID for this task
file_upload: str
Uploaded file used as data source for this task
```
"""
response = self.make_request("GET", f"/api/tasks/{task_id}")
return response.json()
def update_task(self, task_id, **kwargs):
"""Update specific task by ID.
Parameters
----------
task_id: int
Task ID you want to update
kwargs: kwargs parameters
List of parameters to update. Check all available parameters [here](https://labelstud.io/api#operation/api_tasks_partial_update)
Returns
-------
dict:
Dict with updated task
"""
response = self.make_request("PATCH", f"/api/tasks/{task_id}", json=kwargs)
response.raise_for_status()
return response.json()
def create_prediction(
self,
task_id: int,
result: Optional[Union[List[Dict], Dict, str]] = None,
score: Optional[float] = 0,
model_version: Optional[str] = None,
):
"""Create a prediction for a specific task.
Parameters
----------
task_id: int
Task ID
result: list or dict or str
Result in the <a href="https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks">
Label Studio JSON format as for annotations</a>.
For the labeling config:
<View>
<Image name="image" value="$value"/>
<Choices name="class_name" toName="image">
<Choice value="Class A"/>
<Choice value="Class B"/>
</Choices>
</View>
The following inputs are equivalent, result could be either full `"predictions"`:
[{
"from_name": "class_name",
"to_name": "image",
"type": "choices",
"value": {
"choices": ["Class A"]
}
}]
or just `"value"` payload
{"choices": ["Class A"]}
or just the class name:
"Class A"
score: float
Model prediction score
model_version: str
Any string identifying your model
"""
data = {"task": task_id, "result": result, "score": score}
if model_version is not None:
data["model_version"] = model_version
response = self.make_request("POST", "/api/predictions", json=data)
json = response.json()
logger.debug(f"Response: {json}")
return json
def create_predictions(self, predictions):
"""Bulk create predictions for tasks. See <a href="https://labelstud.io/guide/predictions.html">more
details about pre-annotated tasks</a>.
Parameters
----------
predictions: list of dicts
List of dicts with predictions in the <a href="https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks">
Label Studio JSON format as for annotations</a>.
"""
response = self.make_request(
"POST", f"/api/projects/{self.id}/import/predictions", json=predictions
)
return response.json()
def create_annotations_from_predictions(self, model_versions=None):
"""Create annotations from all predictions that exist for project tasks from specific ML model versions.
Parameters
----------
model_versions: list or None
Convert predictions with these model versions to annotations. If `None`, all existing model versions are used
Returns
-------
dict
Dict with counter of created predictions
"""
payload = {
"filters": {"conjunction": "and", "items": []},
"model_version": model_versions,
"ordering": [],
"project": self.id,
"selectedItems": {"all": True, "excluded": []},
}
response = self.make_request(
"POST",
"/api/dm/actions",
params={"id": "predictions_to_annotations", "project": self.id},
json=payload,
)
return response.json()
def list_annotations(self, task_id: int) -> List:
"""List all annotations for a task.
Parameters
----------
task_id: int
Task ID
Returns
-------
list of dict:
List of annotations objects
"""
response = self.make_request("GET", f"/api/tasks/{task_id}/annotations")
response.raise_for_status()
return response.json()
def create_annotation(self, task_id: int, **kwargs) -> Dict:
"""Add annotations to a task like an annotator does.
Parameters
----------
task_id: int
Task ID you want to update
kwargs: kwargs parameters
List of parameters to create. Check all available parameters [here](https://labelstud.io/api#operation/api_tasks_annotations_create).
Labeling is stored in the `result` field as a list of dicionaries, [{...}, {...}, ...]
Returns
-------
dict:
Dict with created annotation
"""
response = self.make_request(
"POST", f"/api/tasks/{task_id}/annotations/", json=kwargs
)
response.raise_for_status()
return response.json()
def get_annotation(self, annotation_id: int) -> dict:
"""Retrieve a specific annotation for a task using the annotation ID.
Parameters
----------
annotation_id: int
A unique integer value identifying this annotation.
Returns
----------
dict
Retreived annotation object
"""
response = self.make_request("GET", f"/api/annotations/{annotation_id}")
response.raise_for_status()
return response.json()
def update_annotation(self, annotation_id, **kwargs):
"""Update specific annotation with new annotation parameters, e.g.
```
project.update_annotation(annotation_id=123, ground_truth=True)
```
Parameters
----------
annotation_id: int
Existing annotation ID from current project. Could be retrieved from `project.get_tasks()` response
kwargs: kwargs parameters
List of annotation parameters. Check all available parameters [here](https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks)
Returns
-------
dict
Dict with updated annotation
"""
response = self.make_request(
"PATCH", f"/api/annotations/{annotation_id}", json=kwargs
)
response.raise_for_status()
return response.json()
def delete_annotation(self, annotation_id: int) -> int:
"""Delete an annotation using the annotation ID. This action can't be undone!
Parameters
----------
annotation_id: int
A unique integer value identifying this annotation.
Returns
----------
int
Status code for operation
"""
response = self.make_request("DELETE", f"/api/annotations/{annotation_id}")
response.raise_for_status()
return response.status_code
def get_predictions_coverage(self):
"""Prediction coverage stats for all model versions for the project.
Returns
-------
dict
Example:
{
"2021-01-01": 0.9,
"2021-02-01": 0.7
}
`0.9` means that 90% of project tasks is covered by predictions with model_version `"2021-01-01"`
"""
model_versions = self.get_model_versions()
params = self.get_params()
tasks_number = params["task_number"]
coverage = {
model_version: count / tasks_number
for model_version, count in model_versions.items()
}
return coverage
def get_predictions_conflict(self):
raise NotImplementedError
def get_predictions_precision(self):
raise NotImplementedError
def connect_google_import_storage(
self,
bucket: str,
prefix: Optional[str] = None,
regex_filter: Optional[str] = None,
use_blob_urls: Optional[bool] = True,
google_application_credentials: Optional[str] = None,
presign: Optional[bool] = True,
presign_ttl: Optional[int] = 1,
title: Optional[str] = "",
description: Optional[str] = "",
):
"""Connect a Google Cloud Storage (GCS) bucket to Label Studio to use as source storage and import tasks.
Parameters
----------
bucket: string
Specify the name of the GCS bucket
prefix: string
Optional, specify the prefix or folder within the GCS bucket with your data
regex_filter: string
Optional, specify a regex filter to use to match the file types of your data
use_blob_urls: bool
Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
google_application_credentials: string
Optional, provide a file with your Google application credentials. If not specified, it will use path stored in `GOOGLE_APPLICATION_CREDENTIALS` environmental variable. Read more about [Google Cloud authentication](https://cloud.google.com/docs/authentication/getting-started)
presign: bool
Optional, true by default. Specify whether or not to create presigned URLs.
presign_ttl: int
Optional, 1 by default. Specify how long to keep presigned URLs active.
title: string
Optional, specify a title for your GCS import storage that appears in Label Studio.
description: string
Optional, specify a description for your GCS import storage.
Returns
-------
dict:
containing the same fields as in the request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
last_sync: str
Time last sync finished, can be empty.
last_sync_count: int
Number of tasks synced in the last sync
"""
if google_application_credentials and os.path.isfile(
google_application_credentials
):
with open(google_application_credentials) as f:
google_application_credentials = f.read()
payload = {
"bucket": bucket,
"project": self.id,
"prefix": prefix,
"regex_filter": regex_filter,
"use_blob_urls": use_blob_urls,
"google_application_credentials": google_application_credentials,
"presign": presign,
"presign_ttl": presign_ttl,
"title": title,
"description": description,
}
response = self.make_request("POST", "/api/storages/gcs", json=payload)
return response.json()
def connect_google_export_storage(
self,
bucket: str,
prefix: Optional[str] = None,
google_application_credentials: Optional[str] = None,
title: Optional[str] = "",
description: Optional[str] = "",
can_delete_objects: bool = False,
):
"""Connect a Google Cloud Storage (GCS) bucket to Label Studio to use as target storage and export tasks.
Parameters
----------
bucket: string
Specify the name of the GCS bucket
prefix: string
Optional, specify the prefix or folder within the GCS bucket to export your data to
google_application_credentials: string
Optional, provide a file with your Google application credentials. If not specified, it will use path stored in `GOOGLE_APPLICATION_CREDENTIALS` environmental variable. Read more about [Google Cloud authentication](https://cloud.google.com/docs/authentication/getting-started)
title: string
Optional, specify a title for your GCS export storage that appears in Label Studio.
description: string
Optional, specify a description for your GCS export storage.
can_delete_objects: bool
False by default. Specify whether to delete tasks in the GCS bucket if they are deleted in Label Studio.
Returns
-------
dict:
containing the same fields as in the request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
last_sync: str
Time last sync finished, can be empty.
last_sync_count: int
Number of tasks synced in the last sync
"""
if os.path.isfile(google_application_credentials):
with open(google_application_credentials) as f:
google_application_credentials = f.read()
payload = {
"bucket": bucket,
"prefix": prefix,
"google_application_credentials": google_application_credentials,
"title": title,
"description": description,
"can_delete_objects": can_delete_objects,
"project": self.id,
}
response = self.make_request("POST", "/api/storages/export/gcs", json=payload)
return response.json()
def connect_s3_import_storage(
self,
bucket: str,
prefix: Optional[str] = None,
regex_filter: Optional[str] = None,
use_blob_urls: Optional[bool] = True,
presign: Optional[bool] = True,
presign_ttl: Optional[int] = 1,
title: Optional[str] = "",
description: Optional[str] = "",
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
region_name: Optional[str] = None,
s3_endpoint: Optional[str] = None,
recursive_scan: Optional[bool] = False,
):
"""Connect an Amazon S3 bucket to Label Studio to use as source storage and import tasks.
Parameters
----------
bucket: string
Specify the name of the S3 bucket.
prefix: string
Optional, specify the prefix within the S3 bucket to import your data from.
regex_filter: string
Optional, specify a regex filter to use to match the file types of your data.
use_blob_urls: bool
Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
presign: bool
Optional, true by default. Specify whether or not to create presigned URLs.
presign_ttl: int
Optional, 1 by default. Specify how long to keep presigned URLs active.
title: string
Optional, specify a title for your S3 import storage that appears in Label Studio.
description: string
Optional, specify a description for your S3 import storage.
aws_access_key_id: string
Optional, specify the access key ID for your bucket.
aws_secret_access_key: string
Optional, specify the secret access key for your bucket.
aws_session_token: string
Optional, specify a session token to use to access your bucket.
region_name: string
Optional, specify the AWS region of your S3 bucket.
s3_endpoint: string
Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method.
recursive_scan: bool
Optional, specify whether to perform recursive scan over the bucket content.
Returns
-------
dict:
containing the same fields as in the request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
last_sync: str
Time last sync finished, can be empty.
last_sync_count: int
Number of tasks synced in the last sync
"""
payload = {
"bucket": bucket,
"prefix": prefix,
"regex_filter": regex_filter,
"use_blob_urls": use_blob_urls,
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key,
"aws_session_token": aws_session_token,
"region_name": region_name,
"s3_endpoint": s3_endpoint,
"presign": presign,
"presign_ttl": presign_ttl,
"title": title,
"description": description,
"project": self.id,
"recursive_scan": recursive_scan,
}
response = self.make_request("POST", "/api/storages/s3", json=payload)
return response.json()
def connect_s3s_iam_import_storage(
self,
role_arn: str,
external_id: Optional[str] = None,
bucket: Optional[str] = None,
prefix: Optional[str] = None,
regex_filter: Optional[str] = None,
use_blob_urls: Optional[bool] = True,
presign: Optional[bool] = True,
presign_ttl: Optional[int] = 1,
title: Optional[str] = "",
description: Optional[str] = "",
region_name: Optional[str] = None,
s3_endpoint: Optional[str] = None,
recursive_scan: Optional[bool] = False,
aws_sse_kms_key_id: Optional[str] = None,
):
"""Create S3 secured import storage with IAM role access. Enterprise only.
Parameters
----------
role_arn: string
Required, specify the AWS Role ARN to assume.
external_id: string or None
Optional, specify the external ID to use to assume the role. If None, SDK will call api/organizations/<id>
and use external_id from the response. You can find this ID on the organization page in the Label Studio UI.
bucket: string
Specify the name of the S3 bucket.
prefix: string
Optional, specify the prefix within the S3 bucket to import your data from.
regex_filter: string
Optional, specify a regex filter to use to match the file types of your data.
use_blob_urls: bool
Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
presign: bool
Optional, true by default. Specify whether or not to create presigned URLs.
presign_ttl: int
Optional, 1 by default. Specify how long to keep presigned URLs active.
title: string
Optional, specify a title for your S3 import storage that appears in Label Studio.
description: string
Optional, specify a description for your S3 import storage.
region_name: string
Optional, specify the AWS region of your S3 bucket.
s3_endpoint: string
Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method.
recursive_scan: bool
Optional, specify whether to perform recursive scan over the bucket content.
aws_sse_kms_key_id: string
Optional, specify an AWS SSE KMS Key ID for server-side encryption.
synchronizable, last_sync, last_sync_count, last_sync_job, status, traceback, meta:
Parameters for synchronization details and storage status.
Returns
-------
dict:
containing the response from the API including storage ID and type, among other details.
"""
if external_id is None:
organization = self.get_organization()
external_id = organization["external_id"]
payload = {
"bucket": bucket,
"prefix": prefix,
"regex_filter": regex_filter,
"use_blob_urls": use_blob_urls,
"presign": presign,
"presign_ttl": presign_ttl,
"title": title,
"description": description,
"recursive_scan": recursive_scan,
"role_arn": role_arn,
"region_name": region_name,
"s3_endpoint": s3_endpoint,
"aws_sse_kms_key_id": aws_sse_kms_key_id,
"project": self.id,
"external_id": external_id,
}
response = self.make_request("POST", "/api/storages/s3s/", json=payload)
return response.json()
def connect_s3_export_storage(
self,
bucket: str,
prefix: Optional[str] = None,
title: Optional[str] = "",
description: Optional[str] = "",
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
region_name: Optional[str] = None,
s3_endpoint: Optional[str] = None,
can_delete_objects: bool = False,
):
"""Connect an Amazon S3 bucket to Label Studio to use as target storage and export tasks.
Parameters
----------
bucket: string
Specify the name of the S3 bucket.
prefix: string
Optional, specify the prefix or folder within the S3 bucket to export your data to.
title: string
Optional, specify a title for your S3 export storage that appears in Label Studio.
description: string
Optional, specify a description for your S3 export storage.
aws_access_key_id: string
Optional, specify the access key ID for your bucket.
aws_secret_access_key: string
Optional, specify the secret access key for your bucket.
aws_session_token: string
Optional, specify a session token to use to access your bucket.
region_name: string
Optional, specify the AWS region of your S3 bucket.
s3_endpoint: string
Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method.
can_delete_objects: bool
False by default. Specify whether to delete tasks in the S3 bucket if they are deleted in Label Studio.
Returns
-------
dict:
containing the same fields as in the request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
last_sync: str
Time last sync finished, can be empty.
last_sync_count: int
Number of tasks synced in the last sync
"""
payload = {
"bucket": bucket,
"prefix": prefix,
"aws_access_key_id": aws_access_key_id,
"aws_secret_access_key": aws_secret_access_key,
"aws_session_token": aws_session_token,
"region_name": region_name,
"s3_endpoint": s3_endpoint,
"title": title,
"description": description,
"can_delete_objects": can_delete_objects,
"project": self.id,
}
response = self.make_request("POST", "/api/storages/export/s3", json=payload)
return response.json()
def connect_azure_import_storage(
self,
container: str,
prefix: Optional[str] = None,
regex_filter: Optional[str] = None,
use_blob_urls: Optional[bool] = True,
presign: Optional[bool] = True,
presign_ttl: Optional[int] = 1,
title: Optional[str] = "",
description: Optional[str] = "",
account_name: Optional[str] = None,
account_key: Optional[str] = None,
):
"""Connect a Microsoft Azure BLOB storage container to Label Studio to use as source storage and import tasks.
Parameters
----------
container: string
Specify the name of the Azure container.
prefix: string
Optional, specify the prefix or folder within the Azure container with your data.
regex_filter: string
Optional, specify a regex filter to use to match the file types of your data.
use_blob_urls: bool
Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
presign: bool
Optional, true by default. Specify whether or not to create presigned URLs.
presign_ttl: int
Optional, 1 by default. Specify how long to keep presigned URLs active.
title: string
Optional, specify a title for your Azure import storage that appears in Label Studio.
description: string
Optional, specify a description for your Azure import storage.
account_name: string
Optional, specify the name of the account with access to the container.
account_key: string
Optional, specify the key for the account with access to the container.
Returns
-------
dict:
containing the same fields as in the request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
last_sync: str
Time last sync finished, can be empty.
last_sync_count: int
Number of tasks synced in the last sync
"""
payload = {
"container": container,
"prefix": prefix,
"regex_filter": regex_filter,
"use_blob_urls": use_blob_urls,
"account_name": account_name,
"account_key": account_key,
"presign": presign,
"presign_ttl": presign_ttl,
"title": title,
"description": description,
"project": self.id,
}
response = self.make_request("POST", "/api/storages/azure", json=payload)
return response.json()
def connect_azure_export_storage(
self,
container: str,
prefix: Optional[str] = None,
title: Optional[str] = "",
description: Optional[str] = "",
account_name: Optional[str] = None,
account_key: Optional[str] = None,
can_delete_objects: bool = False,
):
"""Connect Microsoft Azure BLOB storage to Label Studio to use as target storage and export tasks.
Parameters
----------
container: string
Specify the name of the Azure storage container.
prefix: string
Optional, specify the prefix or folder within the Azure container to export your data to.
title: string
Optional, specify a title for your Azure export storage that appears in Label Studio.
description: string
Optional, specify a description for your Azure export storage.
can_delete_objects: bool
False by default. Specify whether to delete tasks in the Azure container if they are deleted in Label Studio.
account_name: string
Optional, specify the name of the account with access to the container.
account_key: string
Optional, specify the key for the account with access to the container.
Returns
-------
dict:
containing the same fields as in the request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
last_sync: str
Time last sync finished, can be empty.
last_sync_count: int
Number of tasks synced in the last sync
"""
payload = {
"container": container,
"prefix": prefix,
"account_name": account_name,
"account_key": account_key,
"title": title,
"description": description,
"can_delete_objects": can_delete_objects,
"project": self.id,
}
response = self.make_request("POST", "/api/storages/export/azure", json=payload)
return response.json()
def connect_local_import_storage(
self,
local_store_path: [str],
regex_filter: Optional[str] = None,
use_blob_urls: Optional[bool] = True,
title: Optional[str] = "",
description: Optional[str] = "",
):
"""Connect a Local storage to Label Studio to use as source storage and import tasks.
Parameters
----------
local_store_path: string
Path to declare as local storage.
regex_filter: string
Optional, specify a regex filter to use to match the file types of your data
use_blob_urls: bool
Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
title: string
Optional, specify a title for your GCS import storage that appears in Label Studio.
description: string
Optional, specify a description for your GCS import storage.
Returns
-------
dict:
containing the same fields as in the request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
last_sync: str
Time last sync finished, can be empty.
last_sync_count: int
Number of tasks synced in the last sync
"""
if "LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT" not in os.environ:
raise ValueError(
"To use connect_local_import_storage() you should set "
"LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT environment variable, "
"read more: https://labelstud.io/guide/storage.html#Prerequisites-2"
)
root = os.environ["LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT"]
if not os.path.isdir(local_store_path):
raise ValueError(f"{local_store_path} is not a directory")
if (Path(root) in Path(local_store_path).parents) is False:
raise ValueError(
f"{str(Path(root))} is not presented in local_store_path parents: "
f"{str(Path(local_store_path).parents)}"
)
payload = {
"regex_filter": regex_filter,
"use_blob_urls": use_blob_urls,
"path": local_store_path,
"presign": False,
"presign_ttl": 1,
"title": title,
"description": description,
"project": self.id,
}
response = self.make_request(
"POST", f"/api/storages/localfiles?project={self.id}", json=payload
)
return response.json()
def sync_import_storage(self, storage_type, storage_id):
"""Synchronize Import (Source) Cloud Storage.
Parameters
----------
storage_type: string
Specify the type of the storage container. See ProjectStorage for available types.
storage_id: int
Specify the storage ID of the storage container. See get_import_storages() to get ids.
Returns
-------
dict:
containing the same fields as in the original storage request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
last_sync: str
Time last sync finished, can be empty.
last_sync_count: int
Number of tasks synced in the last sync
"""
# originally syn was implemented in Client class, keep it for compatibility
response = self.make_request(
"POST", f"/api/storages/{storage_type}/{str(storage_id)}/sync"
)
return response.json()
# write func for syn export storage
def sync_export_storage(self, storage_type, storage_id):
"""Synchronize Export (Target) Cloud Storage.
Parameters
----------
storage_type: string
Specify the type of the storage container. See ProjectStorage for available types.
storage_id: int
Specify the storage ID of the storage container. See get_export_storages() to get ids.
Returns
-------
dict:
containing the same fields as in the original storage request and:
id: int
Storage ID
type: str
Type of storage
created_at: str
Creation time
other fields:
See more https://api.labelstud.io/#tag/Storage:S3/operation/api_storages_export_s3_sync_create
"""
response = self.make_request(
"POST", f"/api/storages/export/{storage_type}/{str(storage_id)}/sync"
)
return response.json()
# write code for get_import_storages()
def get_import_storages(self):
"""Get Import (Source) Cloud Storage.
Returns
-------
list of dicts:
List of dicts with source storages, each dict consists of these fields:
-------
Each dict consists of these fields:
id : int
A unique integer value identifying this storage.
type : str
The type of the storage. Default is "s3".
synchronizable : bool
Indicates if the storage is synchronizable. Default is True.
presign : bool
Indicates if the storage is presign. Default is True.
last_sync : str or None
The last sync finished time. Can be None.
last_sync_count : int or None
The count of tasks synced last time. Can be None.
last_sync_job : str or None
The last sync job ID. Can be None.
status : str
The status of the storage. Can be one of "initialized", "queued", "in_progress", "failed", "completed".
traceback : str or None
The traceback report for the last failed sync. Can be None.
meta : dict or None
Meta and debug information about storage processes. Can be None.
title : str or None
The title of the cloud storage. Can be None.
description : str or None
The description of the cloud storage. Can be None.
created_at : str
The creation time of the storage.
bucket : str or None
The S3 bucket name. Can be None.
prefix : str or None
The S3 bucket prefix. Can be None.
regex_filter : str or None
The cloud storage regex for filtering objects. Can be None.
use_blob_urls : bool
Indicates if objects are interpreted as BLOBs and generate URLs.
aws_access_key_id : str or None
The AWS_ACCESS_KEY_ID. Can be None.
aws_secret_access_key : str or None
The AWS_SECRET_ACCESS_KEY. Can be None.
aws_session_token : str or None
The AWS_SESSION_TOKEN. Can be None.
aws_sse_kms_key_id : str or None
The AWS SSE KMS Key ID. Can be None.
region_name : str or None
The AWS Region. Can be None.
s3_endpoint : str or None
The S3 Endpoint. Can be None.
presign_ttl : int
The presigned URLs TTL (in minutes).
recursive_scan : bool
Indicates if a recursive scan over the bucket content is performed.
glob_pattern : str or None
The glob pattern for syncing from bucket. Can be None.
synced : bool
Flag indicating if the dataset has been previously synced or not.
"""
response = self.make_request("GET", f"/api/storages/?project={self.id}")
return response.json()
def get_export_storages(self):
"""Get Export (Target) Cloud Storage.
Returns
-------
list of dicts:
List of dicts with target storages
-------
Each dict consists of these fields:
id : int
A unique integer value identifying this storage.
type : str
The type of the storage. Default is "s3".
synchronizable : bool
Indicates if the storage is synchronizable. Default is True.
last_sync : str or None
The last sync finished time. Can be None.
last_sync_count : int or None
The count of tasks synced last time. Can be None.
last_sync_job : str or None
The last sync job ID. Can be None.
status : str
The status of the storage. Can be one of "initialized", "queued", "in_progress", "failed", "completed".
traceback : str or None
The traceback report for the last failed sync. Can be None.
meta : dict or None
Meta and debug information about storage processes. Can be None.
title : str or None
The title of the cloud storage. Can be None.
description : str or None
The description of the cloud storage. Can be None.
created_at : str
The creation time of the storage.
can_delete_objects : bool or None
Deletion from storage enabled. Can be None.
bucket : str or None
The S3 bucket name. Can be None.
prefix : str or None
The S3 bucket prefix. Can be None.
regex_filter : str or None
The cloud storage regex for filtering objects. Can be None.
use_blob_urls : bool
Indicates if objects are interpreted as BLOBs and generate URLs.
aws_access_key_id : str or None
The AWS_ACCESS_KEY_ID. Can be None.
aws_secret_access_key : str or None
The AWS_SECRET_ACCESS_KEY. Can be None.
aws_session_token : str or None
The AWS_SESSION_TOKEN. Can be None.
aws_sse_kms_key_id : str or None
The AWS SSE KMS Key ID. Can be None.
region_name : str or None
The AWS Region. Can be None.
s3_endpoint : str or None
The S3 Endpoint. Can be None.
project : int
A unique integer value identifying this project.
"""
response = self.make_request("GET", f"/api/storages/export?project={self.id}")
return response.json()
def _assign_by_sampling(
self,
users: List[int],
assign_function: Callable,
view_id: int = None,
method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM,
fraction: float = 1.0,
overlap: int = 1,
):
"""
Assigning tasks to Reviewers or Annotators by assign_function with method by fraction from view_id
Parameters
----------
users: List[int]
users' IDs list
assign_function: Callable
Function to assign tasks by list of user IDs
view_id: int
Optional, view ID to filter tasks to assign
method: AssignmentSamplingMethod
Optional, Assignment method
fraction: float
Optional, expresses the size of dataset to be assigned
overlap: int
Optional, expresses the count of assignments for each task
Returns
-------
list[dict]
List of dicts with counter of created assignments
"""
assert len(users) > 0, "Users list is empty."
assert len(users) >= overlap, "Overlap is more than number of users."
# check if users are int and not User objects
if isinstance(users[0], int):
# get users from project
project_users = self.get_members()
# User objects list
users = [user for user in project_users if user.id in users]
final_results = []
# Get tasks to assign
tasks = self.get_tasks(view_id=view_id, only_ids=True)
assert len(tasks) > 0, "Tasks list is empty."
# Choice fraction of tasks
if fraction != 1.0:
k = int(len(tasks) * fraction)
tasks = sample(tasks, k)
# prepare random list of tasks for overlap > 1
if overlap > 1:
shuffle(tasks)
tasks = tasks * overlap
# Check how many tasks for each user
n_tasks = max(int(len(tasks) // len(users)), 1)
# Assign each user tasks
for user in users:
# check if last chunk of tasks is less than average chunk
if n_tasks > len(tasks):
n_tasks = len(tasks)
# check if last chunk of tasks is more than average chunk + 1
# (covers rounding issue in line 1407)
elif n_tasks + 1 == len(tasks) and n_tasks != 1:
n_tasks = n_tasks + 1
if method == AssignmentSamplingMethod.RANDOM and overlap == 1:
sample_tasks = sample(tasks, n_tasks)
elif method == AssignmentSamplingMethod.RANDOM and overlap > 1:
sample_tasks = tasks[:n_tasks]
else:
raise ValueError(f"Sampling method {method} is not allowed")
final_results.append(assign_function([user], sample_tasks))
if overlap > 1:
tasks = tasks[n_tasks:]
else:
tasks = list(set(tasks) - set(sample_tasks))
if len(tasks) == 0:
break
# check if any tasks left
if len(tasks) > 0:
for user in users:
if not tasks:
break
task = tasks.pop()
final_results.append(assign_function([user], [task]))
return final_results
def assign_reviewers_by_sampling(
self,
users: List[int],
view_id: int = None,
method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM,
fraction: float = 1.0,
overlap: int = 1,
):
"""
Behaves similarly like `assign_reviewers()` but instead of specify tasks_ids explicitely,
it gets users' IDs list and optional view ID and uniformly splits all tasks across reviewers
Fraction expresses the size of dataset to be assigned
Parameters
----------
users: List[int]
users' IDs list
view_id: int
Optional, view ID to filter tasks to assign
method: AssignmentSamplingMethod
Optional, Assignment method
fraction: float
Optional, expresses the size of dataset to be assigned
overlap: int
Optional, expresses the count of assignments for each task
Returns
-------
list[dict]
List of dicts with counter of created assignments
"""
return self._assign_by_sampling(
users=users,
assign_function=self.assign_reviewers,
view_id=view_id,
method=method,
fraction=fraction,
overlap=overlap,
)
def assign_annotators_by_sampling(
self,
users: List[int],
view_id: int = None,
method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM,
fraction: float = 1.0,
overlap: int = 1,
):
"""
Behaves similarly like `assign_annotators()` but instead of specify tasks_ids explicitly,
it gets users' IDs list and optional view ID and splits all tasks across annotators.
Fraction expresses the size of dataset to be assigned.
Parameters
----------
users: List[int]
users' IDs list
view_id: int
Optional, view ID to filter tasks to assign
method: AssignmentSamplingMethod
Optional, Assignment method
fraction: float
Optional, expresses the size of dataset to be assigned
overlap: int
Optional, expresses the count of assignments for each task
Returns
-------
list[dict]
List of dicts with counter of created assignments
"""
return self._assign_by_sampling(
users=users,
assign_function=self.assign_annotators,
view_id=view_id,
method=method,
fraction=fraction,
overlap=overlap,
)
def export_snapshot_list(self) -> list:
"""
Get list of export snapshots for the current project
-------
Returns
-------
list[dict]
List of dict with export snapshots with status:
id: int
Export ID
created_at: str
Creation time
status: str
Export status
created_by: dict
User data
finished_at: str
Finished time
"""
response = self.make_request("GET", f"/api/projects/{self.id}/exports")
return response.json()
def export_snapshot_create(
self,
title: str,
task_filter_options: dict = None,
serialization_options_drafts: bool = True,
serialization_options_predictions: bool = True,
serialization_options_annotations__completed_by: bool = True,
annotation_filter_options_usual: bool = True,
annotation_filter_options_ground_truth: bool = True,
annotation_filter_options_skipped: bool = True,
interpolate_key_frames: bool = False,
) -> dict:
"""
Create new export snapshot
----------
Parameters
----------
title: str
Export title
task_filter_options: dict
Task filter options, use {"view": tab_id} to apply filter from this tab,
<a href="https://api.labelstud.io/#operation/api_projects_exports_create">check the API parameters for more details</a>
serialization_options_drafts: bool
Expand drafts (False) or include only ID (True)
serialization_options_predictions: bool
Expand predictions (False) or include only ID (True)
serialization_options_annotations__completed_by: bool
Expand user that completed_by (False) or include only ID (True)
annotation_filter_options_usual: bool
Include not cancelled and not ground truth annotations
annotation_filter_options_ground_truth: bool
Filter ground truth annotations
annotation_filter_options_skipped: bool
Filter skipped annotations
interpolate_key_frames: bool
Interpolate key frames into sequence
Returns
-------
dict:
containing the same fields as in the request and the created export fields:
id: int
Export ID
created_at: str
Creation time
status: str
Export status
created_by: dict
User data
finished_at: str
Finished time
"""
if task_filter_options is None:
task_filter_options = {}
payload = {
"title": title,
"serialization_options": {
"drafts": {"only_id": serialization_options_drafts},
"predictions": {"only_id": serialization_options_predictions},
"annotations__completed_by": {
"only_id": serialization_options_annotations__completed_by
},
"interpolate_key_frames": interpolate_key_frames,
},
"task_filter_options": task_filter_options,
"annotation_filter_options": {
"usual": annotation_filter_options_usual,
"ground_truth": annotation_filter_options_ground_truth,
"skipped": annotation_filter_options_skipped,
},
}
response = self.make_request(
"POST",
f"/api/projects/{self.id}/exports?interpolate_key_frames={interpolate_key_frames}",
json=payload,
)
return response.json()
def export(
self,
filters=None,
title="SDK Export",
export_type="JSON",
output_dir=".",
**kwargs,
):
"""
Export tasks from the project with optional filters,
and save the exported data to a specified directory.
This method:
(1) creates a temporary view with the specified filters if they are not None,
(2) creates a new export snapshot using the view ID,
(3) checks the status of the snapshot creation while it's in progress,
(4) and downloads the snapshot file in the specified export format.
(5) After the export, it cleans up and remove the temporary view.
Parameters
----------
filters : data_manager.Filters, dict, optional
Filters to apply when exporting tasks.
If provided, a temporary view is created with these filters.
The format of the filters should match the Label Studio filter options.
Default is None, which means all tasks are exported.
Use label_studio_sdk.data_manager.Filters.create() to create filters,
Example of the filters JSON format:
```json
{
"conjunction": "and",
"items": [
{
"filter": "filter:tasks:id",
"operator": "equal",
"type": "Number",
"value": 1
}
]
}
```
titile : str, optional
The title of the export snapshot. Default is 'SDK Export'.
export_type : str, optional
The format of the exported data. It should be one of the formats supported by Label Studio ('JSON', 'CSV', etc.). Default is 'JSON'.
output_dir : str, optional
The directory where the exported file will be saved. Default is the current directory.
kwargs : kwargs, optional
The same parameters as in the export_snapshot_create method.
Returns
-------
dict
containing the status of the export, the filename of the exported file, and the export ID.
filename : str
Path to the downloaded export file
status : int
200 is ok
export_id : int
Export ID, you can retrieve more details about this export using this ID
"""
# Create a temporary view with the specified filters
if filters:
view = self.create_view(title="Temp SDK export", filters=filters)
task_filter_options = {"view": view["id"]}
else:
task_filter_options = None
view = None
# Create a new export snapshot using the view ID
export_result = self.export_snapshot_create(
title=title,
task_filter_options=task_filter_options,
**kwargs,
)
# Check the status of the snapshot creation
export_id = export_result["id"]
while self.export_snapshot_status(export_id).is_in_progress():
time.sleep(1.0) # Wait until the snapshot is ready
os.makedirs(output_dir, exist_ok=True)
# Download the snapshot file once it's ready
status, filename = self.export_snapshot_download(
export_id, export_type=export_type, path=output_dir
)
# Clean up the view
if view:
self.delete_view(view["id"])
return {"status": status, "filename": filename, "export_id": export_id}
def export_snapshot_status(self, export_id: int) -> ExportSnapshotStatus:
"""
Get export snapshot status by Export ID
----------
Parameters
----------
export_id: int
Existing Export ID from current project. Can be referred as id from self.exports()
Returns
-------
`label_studio_sdk.project.ExportSnapshotStatus`
ExportSnapshotStatus.response is dict and contains the following fields:
id: int
Export ID
created_at: str
Creation time
status: str
created, completed, in_progress, failed
created_by: dict
User data
finished_at: str
Finished time
"""
response = self.make_request(
"GET", f"/api/projects/{self.id}/exports/{export_id}"
)
return ExportSnapshotStatus(response.json())
def export_snapshot_download(
self, export_id: int, export_type: str = "JSON", path: str = "."
) -> (int, str):
"""
Download file with export snapshot in provided format
----------
Parameters
----------
export_id: int
Existing Export ID from current project. Can be referred as id from self.exports()
export_type: str
Default export_type is JSON.
Specify another format type as referenced in <a href="https://github.com/heartexlabs/label-studio-converter/blob/master/label_studio_converter/converter.py#L32">
the Label Studio converter code</a>.
path: str
Default path to store downloaded files
Returns
-------
Status code for operation and downloaded filename
"""
response = self.make_request(
"GET",
f"/api/projects/{self.id}/exports/{export_id}/download?exportType={export_type}",
)
filename = None
if response.status_code == 200:
content_disposition = response.headers.get("Content-Disposition")
if content_disposition:
filename = content_disposition.split("filename=")[-1].strip("\"'")
filename = os.path.basename(filename)
else:
raise LabelStudioException("No filename in response")
with open(os.path.join(path, filename), "wb") as f:
for chk in response:
f.write(chk)
return response.status_code, filename
def export_snapshot_delete(self, export_id: int) -> int:
"""Delete an export file by specified export ID
Parameters
----------
export_id: int
Existing Export ID from current project
Returns
----------
Status code for operation
"""
response = self.make_request(
"DELETE", f"/api/projects/{self.id}/exports/{export_id}"
)
return response.status_code
def get_files_from_tasks(self, tasks: Dict, get_tasks: bool = False):
"""Copy files from tasks to cache folder
Parameters
----------
tasks: Dict
Tasks to download to local storage
get_tasks: bool
Get all tasks from current project
Returns
-------
list
List of filenames
"""
if get_tasks:
tasks = self.get_tasks()
filenames = []
if tasks:
for task in tasks:
for key in task["data"]:
try:
filename = get_local_path(
task["data"][key],
access_token=self.api_key,
hostname=self.url,
)
filenames.append(filename)
except (FileNotFoundError, InvalidSchema, MissingSchema, IOError):
logger.debug(f"Couldn't copy file {task['data'][key]}.")
return filenames
def delete_task(self, task_id: int) -> Response:
"""Delete a task. To remove multiple tasks `use delete_tasks()`.
Parameters
----------
task_id: int
Task id.
"""
assert isinstance(task_id, int), "task_id should be int"
return self.make_request("DELETE", f"/api/tasks/{task_id}")
def delete_tasks(self, task_ids: list) -> Response:
"""Delete multiple tasks by IDs.
Parameters
----------
task_ids: list of int
Task ids.
"""
assert isinstance(task_ids, list), "task_ids should be list of int"
if not task_ids: # avoid deletion of all tasks when task_ids = []
return Response()
payload = {
"selectedItems": {"all": False, "included": task_ids},
"project": self.id,
}
return self.make_request(
"POST", f"/api/dm/actions?project={self.id}&id=delete_tasks", json=payload
)
def delete_all_tasks(self, excluded_ids: list = None) -> Response:
"""Delete all tasks from the project.
Parameters
----------
excluded_ids: list of int
Task ids that should be excluded from the deletion.
"""
assert (
isinstance(excluded_ids, list) or excluded_ids is None
), "excluded_ids should be list of int or None"
if excluded_ids is None:
excluded_ids = []
payload = {
"selectedItems": {"all": True, "excluded": excluded_ids},
"project": self.id,
}
return self.make_request(
"POST", f"/api/dm/actions?project={self.id}&id=delete_tasks", json=payload
)
类
class AssignmentSamplingMethod (value, names=None, *, module=None, qualname=None, type=None, start=1)-
一个枚举类型。
source code 浏览Git
class AssignmentSamplingMethod(Enum): RANDOM = auto() # produces uniform splits across annotators常量
RANDOM
class ExportSnapshotStatus (response)-
source code 浏览Git
class ExportSnapshotStatus: CREATED = "created" """ Export snapshot is created """ IN_PROGRESS = "in_progress" """ Export snapshot is in progress """ FAILED = "failed" """ Export snapshot failed with errors """ COMPLETED = "completed" """ Export snapshot was created and can be downloaded """ def __init__(self, response): self.response = response def is_created(self): """Export snapshot is created""" assert ( "status" in self.response ), '"status" field not found in export snapshot status response' return self.response["status"] == self.CREATED def is_in_progress(self): """Export snapshot is in progress""" assert ( "status" in self.response ), '"status" field not found in export_snapshot_status response' return self.response["status"] == self.IN_PROGRESS def is_failed(self): """Export snapshot failed with errors""" assert ( "status" in self.response ), '"status" field not found in export_snapshot_status response' return self.response["status"] == self.FAILED def is_completed(self): """Export snapshot was created and can be downloaded""" assert ( "status" in self.response ), '"status" field not found in export_snapshot_status response' return self.response["status"] == self.COMPLETED常量
COMPLETED-
导出快照已创建并可下载
CREATED-
导出快照已创建
FAILED-
导出快照失败,出现错误
IN_PROGRESS-
正在导出快照
方法
def is_completed(self)-
导出快照已创建并可下载
source code 浏览Git
def is_completed(self): """Export snapshot was created and can be downloaded""" assert ( "status" in self.response ), '"status" field not found in export_snapshot_status response' return self.response["status"] == self.COMPLETED def is_created(self)-
导出快照已创建
source code 浏览Git
def is_created(self): """Export snapshot is created""" assert ( "status" in self.response ), '"status" field not found in export snapshot status response' return self.response["status"] == self.CREATED def is_failed(self)-
导出快照失败,出现错误
source code 浏览Git
def is_failed(self): """Export snapshot failed with errors""" assert ( "status" in self.response ), '"status" field not found in export_snapshot_status response' return self.response["status"] == self.FAILED def is_in_progress(self)-
导出快照正在进行中
source code 浏览Git
def is_in_progress(self): """Export snapshot is in progress""" assert ( "status" in self.response ), '"status" field not found in export_snapshot_status response' return self.response["status"] == self.IN_PROGRESS
class LabelStudioAttributeError (*args, **kwargs)-
所有非退出异常的共同基类。
source code 浏览Git
class LabelStudioAttributeError(LabelStudioException): pass class LabelStudioException (*args, **kwargs)-
所有非退出异常的共同基类。
source code 浏览Git
class LabelStudioException(Exception): pass子类
class Project (*args, **kwargs)-
初始化项目类。
参数
source code 浏览Git
class Project(Client): def __init__(self, *args, **kwargs): """Initialize project class. Parameters ---------- """ super(Project, self).__init__(*args, **kwargs) self.params = {} def __getattr__(self, item): return self._get_param(item) @property def parsed_label_config(self): """Get the parsed labeling configuration for the project. You can use this to more easily construct annotation or prediction results based on your labeling configuration. Returns ------- dict Object and control tags from the project labeling configuration. Example with structured configuration of the form: ``` { "<ControlTag>.name": { "type": "ControlTag", "to_name": ["<ObjectTag1>.name", "<ObjectTag2>.name"], "inputs: [ {"type": "ObjectTag1", "value": "<ObjectTag1>.value"}, {"type": "ObjectTag2", "value": "<ObjectTag2>.value"} ], "labels": ["Label1", "Label2", "Label3"] } ``` `"labels"` are taken from "alias" attribute if it exists, else "value" """ return parse_config(self.label_config) def get_members(self): """Get members from this project. Parameters ---------- Returns ------- list of `label_studio_sdk.users.User` """ from .users import User assert self.is_enterprise, ( "Project members are available in the Enterprise edition of Label Studio only. " "Use get_users() instead." ) response = self.make_request("GET", f"/api/projects/{self.id}/members") users = [] for user_data in response.json(): user_data["client"] = self users.append(User(**user_data)) return users def add_member(self, user): """Add a user to a project. Parameters ---------- user: User Returns ------- dict Dict with created member """ payload = {"user": user.id} response = self.make_request( "POST", f"/api/projects/{self.id}/members", json=payload ) return response.json() def assign_annotators(self, users, tasks_ids): """Assign annotators to tasks Parameters ---------- users: list of user's objects tasks_ids: list of integer task IDs to assign users to Returns ------- dict Dict with counter of created assignments """ final_response = {"assignments": 0} users_ids = [user.id for user in users] # Assign tasks to users with batches for c in chunk(tasks_ids, 1000): logger.debug(f"Starting assignment for: {users_ids}") payload = { "users": users_ids, "selectedItems": {"all": False, "included": c}, "type": "AN", } response = self.make_request( "POST", f"/api/projects/{self.id}/tasks/assignees", json=payload ) final_response["assignments"] += response.json()["assignments"] return final_response def delete_annotators_assignment(self, tasks_ids): """Remove all assigned annotators for tasks Parameters ---------- tasks_ids: list of int Returns ------- dict Dict with counter of deleted annotator assignments """ payload = {"selectedItems": {"all": False, "included": tasks_ids}} response = self.make_request( "POST", f"/api/dm/actions?id=delete_annotators&project={self.id}", json=payload, ) return response.json() def delete_reviewers_assignment(self, tasks_ids): """Clear all assigned reviewers for tasks Parameters ---------- tasks_ids: list of int Returns ------- dict Dict with counter of deleted reviewer assignments """ payload = {"selectedItems": {"all": False, "included": tasks_ids}} response = self.make_request( "POST", f"/api/dm/actions?id=delete_reviewers&project={self.id}", json=payload, ) return response.json() def assign_reviewers(self, users, tasks_ids): """Assign reviewers to tasks Parameters ---------- users: list of user's objects tasks_ids: list of integer task IDs to assign reviewers to Returns ------- dict Dict with counter of created assignments """ payload = { "users": [user.id for user in users], "selectedItems": {"all": False, "included": tasks_ids}, "type": "RE", } response = self.make_request( "POST", f"/api/projects/{self.id}/tasks/assignees", json=payload ) return response.json() def _get_param(self, param_name): if param_name not in self.params: self.update_params() if param_name not in self.params: raise LabelStudioAttributeError( f'Project "{param_name}" field is not set' ) return self.params[param_name] def get_params(self): """Get all available project parameters. Returns -------- dict containing all following params: title: str Project name. description: str Project description label_config: str Label config in XML format. expert_instruction: str Labeling instructions in HTML format show_instruction: bool Whether to display instructions to annotators before they start show_skip_button: bool Whether to show a skip button in the Label Studio UI and let annotators skip the task enable_empty_annotation: bool Allow annotators to submit empty annotations show_annotation_history: bool Show annotation history to annotator organization: int Organization ID color: str Color to decorate the project card in the Label Studio UI maximum_annotations: int Maximum number of annotations for one task. If the number of annotations per task is equal or greater to this value, the task is finished and is_labeled=True is set. (Enterprise only) is_published: bool Whether or not the project is published to annotators (Enterprise only) model_version: str Machine learning model version for predictions or pre-annotations is_draft: bool Whether or not the project is in the middle of being created (Enterprise only) created_by: object Details about the user that created the project min_annotations_to_start_training: int Minimum number of completed tasks after which model training is started show_collab_predictions: bool Whether to show model predictions to the annotator, allowing them to collaborate with the ML model sampling: str Type of sampling to use for task labeling. Uncertainty sampling is Enterprise only. Enum: "Sequential sampling" "Uniform sampling" "Uncertainty sampling" show_ground_truth_first: bool Whether to show tasks with ground truth annotations first (Enterprise only) show_overlap_first: bool Whether to show tasks with overlap first (Enterprise only) overlap_cohort_percentage: int Percentage of tasks that must be annotated multiple times. (Enterprise only) task_data_login: str User credentials for accessing task data. (Enterprise only) task_data_password: str Password credentials for accessing task data. (Enterprise only) control_weights: object Weights for control tags used when calculating agreement metrics. (Enterprise only) evaluate_predictions_automatically: bool Retrieve and display predictions when loading a task """ response = self.make_request("GET", f"/api/projects/{self.id}") return response.json() def get_model_versions(self): """Get the list of available ML model versions from pre-annotations or connected ML backends. Returns ------- list of strings Model versions """ response = self.make_request("GET", f"/api/projects/{self.id}/model-versions") return response.json() def update_params(self): """Get [all available project parameters](#label_studio_sdk.project.Project.get_params) and cache them.""" self.params = self.get_params() def start_project(self, **kwargs): """Create a new labeling project in Label Studio. Parameters ---------- title: str Project name. description: str Project description label_config: str Label config in XML format. expert_instruction: str Labeling instructions in HTML format show_instruction: bool Whether to display instructions to annotators before they start show_skip_button: bool Whether to show a skip button in the Label Studio UI and let annotators skip the task enable_empty_annotation: bool Allow annotators to submit empty annotations show_annotation_history: bool Show annotation history to annotator organization: int Organization ID color: str Color to decorate the project card in the Label Studio UI maximum_annotations: int Maximum number of annotations for one task. If the number of annotations per task is equal or greater to this value, the task is finished and is_labeled=True is set. (Enterprise only) is_published: bool Whether or not the project is published to annotators (Enterprise only) model_version: str Machine learning model version for predictions or pre-annotations is_draft: bool Whether or not the project is in the middle of being created (Enterprise only) created_by: object Details about the user that created the project min_annotations_to_start_training: int Minimum number of completed tasks after which model training is started show_collab_predictions: bool Whether to show model predictions to the annotator, allowing them to collaborate with the ML model sampling: str Type of sampling to use for task labeling. Uncertainty sampling is Enterprise only. Enum: "Sequential sampling" "Uniform sampling" "Uncertainty sampling" show_ground_truth_first: bool Whether to show tasks with ground truth annotations first (Enterprise only) show_overlap_first: bool Whether to show tasks with overlap first (Enterprise only) overlap_cohort_percentage: int Percentage of tasks that must be annotated multiple times. (Enterprise only) task_data_login: str User credentials for accessing task data. (Enterprise only) task_data_password: str Password credentials for accessing task data. (Enterprise only) control_weights: object Weights for control tags used when calculating agreement metrics. (Enterprise only) evaluate_predictions_automatically: bool Retrieve and display predictions when loading a task Raises LabelStudioException in case of errors. """ response = self.make_request("POST", "/api/projects", json=kwargs) if response.status_code == 201: self.params = response.json() else: raise LabelStudioException("Project not created") @classmethod def _create_from_id(cls, client, project_id, params=None): project = cls( url=client.url, api_key=client.api_key, session=client.session, extra_headers=client.headers, versions=client.versions, make_request_raise=client.make_request_raise, ) if params and isinstance(params, dict): # TODO: validate project parameters project.params = params project.params["id"] = project_id return project @classmethod def get_from_id(cls, client, project_id) -> "Project": """Class factory to create a project instance from an existing project ID. Parameters ---------- client: class Client project_id: int Project ID Returns ------- `Project` """ project = cls._create_from_id(client, project_id) project.update_params() return project def import_tasks(self, tasks, preannotated_from_fields: List = None): """Import JSON-formatted labeling tasks. Tasks can be unlabeled or contain predictions. Parameters ---------- tasks: list of dicts | dict | path to file Tasks in <a href="https://labelstud.io/guide/tasks.html#Basic-Label-Studio-JSON-format"> Label Studio JSON format</a> preannotated_from_fields: list of strings Turns flat task JSON formatted like: `{"column1": value, "column2": value}` into Label Studio prediction data format: `{"data": {"column1"..}, "predictions": [{..."column2"}]` Useful when all your data is stored in tabular format with one column dedicated to model predictions. Returns ------- list of int Imported task IDs """ params = {"return_task_ids": "1"} if preannotated_from_fields: params["preannotated_from_fields"] = ",".join(preannotated_from_fields) if isinstance(tasks, (list, dict)): response = self.make_request( method="POST", url=f"/api/projects/{self.id}/import", json=tasks, params=params, timeout=(10, 600), ) elif isinstance(tasks, (str, Path)): # try import from file if not os.path.isfile(tasks): raise LabelStudioException(f"Not found import tasks file {tasks}") with open(tasks, mode="rb") as f: response = self.make_request( method="POST", url=f"/api/projects/{self.id}/import", files={"file": f}, params=params, timeout=(10, 600), ) else: raise TypeError( f'Not supported type provided as "tasks" argument: {type(tasks)}' ) response = response.json() if "import" in response: # check import status timeout = 300 fibonacci_backoff = [1, 1] start_time = time.time() while True: import_status = self.make_request( method="GET", url=f'/api/projects/{self.id}/imports/{response["import"]}', ).json() if import_status["status"] == "completed": return import_status["task_ids"] if import_status["status"] == "failed": raise LabelStudioException(import_status["error"]) if time.time() - start_time >= timeout: raise LabelStudioException("Import timeout") time.sleep(fibonacci_backoff[0]) fibonacci_backoff = [ fibonacci_backoff[1], fibonacci_backoff[0] + fibonacci_backoff[1], ] return response["task_ids"] def export_tasks( self, export_type: str = "JSON", download_all_tasks: bool = False, download_resources: bool = False, ids: Optional[List[int]] = None, export_location: Optional[str] = None, ) -> Union[list, pathlib.Path]: """Export annotated tasks. Parameters ---------- export_type: string Default export_type is JSON. Specify another format type as referenced in <a href="https://github.com/heartexlabs/label-studio-converter/blob/master/label_studio_converter/converter.py#L32"> the Label Studio converter code</a>. download_all_tasks: bool Default download_all_tasks is False. If true, download all tasks regardless of status. If false, download only annotated tasks. download_resources: bool Default download_resources is False. If true, download all resource files such as images, audio, and others relevant to the tasks. ids: list of ints Optional, specify a list of task IDs to retrieve only the details for those tasks. export_location: str or path Optional, specify a location to save the export to, this is mandatory for the YOLO export. A pathlib.Path object will be returned instead of the deserialized json. Returns ------- list of dicts if export_location is None Tasks with annotations pathlib.Path if export_location is not None Path to the export """ params = { "exportType": export_type, "download_all_tasks": download_all_tasks, "download_resources": download_resources, } if ids: params["ids"] = ids response = self.make_request( method="GET", url=f"/api/projects/{self.id}/export", params=params ) if export_location is None: if "JSON" not in export_type.upper(): raise ValueError( f"{export_type} export type requires an export location to be specified" ) return response.json() export_path = pathlib.Path(export_location) # ensure that parent location exists even if it is in some subdirectory export_path.parent.mkdir(parents=True, exist_ok=True) with open(export_path, "wb") as out_file: for chunk in response.iter_content( chunk_size=1024 ): # 1 kib seems reasonable out_file.write(chunk) return export_path def set_params(self, **kwargs): """Low level function to set project parameters.""" response = self.make_request("PATCH", f"/api/projects/{self.id}", json=kwargs) assert response.status_code == 200 def set_sampling(self, sampling: ProjectSampling): """Set the project sampling method for the labeling stream.""" self.set_params(sampling=sampling.value) def set_published(self, is_published: bool): """Set the project publication state. (Enterprise only) Parameters ---------- is_published: bool Project publication state for reviewers and annotators """ self.set_params(is_published=is_published) def set_model_version(self, model_version: str): """Set the current model version to use for displaying predictions to annotators, perform uncertainty sampling and annotation evaluations in Label Studio Enterprise, and other operations. Parameters ---------- model_version: string It can be any string you want """ self.set_params(model_version=model_version) def get_tasks( self, filters=None, ordering=None, view_id=None, selected_ids=None, only_ids: bool = False, ): """Retrieve a subset of tasks from the Data Manager based on a filter, ordering mechanism, or a predefined view ID. Parameters ---------- filters: label_studio_sdk.data_manager.Filters.create() JSON objects representing Data Manager filters. Use `label_studio_sdk.data_manager.Filters.create()` helper to create it. Example: ```json { "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] } ``` ordering: list of label_studio_sdk.data_manager.Column List with <b>one</b> string representing Data Manager ordering. Use `label_studio_sdk.data_manager.Column` helper class. Example: ```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order view_id: int View ID, visible as a Data Manager tab, for which to retrieve filters, ordering, and selected items selected_ids: list of ints Task IDs only_ids: bool If true, return only task IDs Returns ------- list Task list with task data, annotations, predictions and other fields from the Data Manager """ page = 1 result = [] data = {} while not data.get("end_pagination"): try: data = self.get_paginated_tasks( filters=filters, ordering=ordering, view_id=view_id, selected_ids=selected_ids, only_ids=only_ids, page=page, page_size=100, ) result += data["tasks"] page += 1 except LabelStudioException as e: logger.debug(f"Error during pagination: {e}") break return result def get_paginated_tasks( self, filters=None, ordering=None, view_id=None, selected_ids=None, page: int = 1, page_size: int = 100, only_ids: bool = False, resolve_uri: bool = True, ): """Retrieve a subset of tasks from the Data Manager based on a filter, ordering mechanism, or a predefined view ID. For non-existent pages it returns 404 error. Parameters ---------- filters: label_studio_sdk.data_manager.Filters.create() JSON objects representing Data Manager filters. Use `label_studio_sdk.data_manager.Filters.create()` helper to create it. Example: { "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] } ordering: list of label_studio_sdk.data_manager.Column List with <b>one</b> string representing Data Manager ordering. Use `label_studio_sdk.data_manager.Column` helper class. Example: ```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order view_id: int View ID, visible as a Data Manager tab, for which to retrieve filters, ordering, and selected items selected_ids: list of ints Task IDs page: int Page. Default is 1. page_size: int Page size. Default is 100, to retrieve all tasks in the project you can use get_tasks(). only_ids: bool If true, return only task IDs resolve_uri: bool Resolve pre-sign urls to https links Returns ------- dict Example: { "tasks": [{...}], "total_annotations": 50, "total_predictions": 100, "total": 100 } tasks: list of dicts Tasks with task data, annotations, predictions and other fields from the Data Manager total: int Total number of tasks in filtered result total_annotations: int Total number of annotations in filtered tasks total_predictions: int Total number of predictions in filtered tasks """ query = { "filters": filters, "ordering": ordering or [], "selectedItems": ( {"all": False, "included": selected_ids} if selected_ids else {"all": True, "excluded": []} ), } params = { "project": self.id, "page": page, "page_size": page_size, "view": view_id, "query": json.dumps(query), "fields": "all", "resolve_uri": resolve_uri, } if only_ids: params["include"] = "id" response = self.make_request( "GET", "/api/tasks", params, raise_exceptions=False ) # we'll get 404 from API on empty page if response.status_code == 404: return {"tasks": [], "end_pagination": True} elif response.status_code != 200: self.log_response_error(response) try: response.raise_for_status() except HTTPError as e: raise LabelStudioException(f"Error loading tasks: {e}") data = response.json() tasks = data["tasks"] if only_ids: data["tasks"] = [task["id"] for task in tasks] return data def get_tasks_ids(self, *args, **kwargs): """Same as `label_studio_sdk.project.Project.get_tasks()` but returns only task IDs.""" kwargs["only_ids"] = True return self.get_tasks(*args, **kwargs) def get_paginated_tasks_ids(self, *args, **kwargs): """Same as `label_studio_sdk.project.Project.get_paginated_tasks()` but returns only task IDs. """ kwargs["only_ids"] = True return self.get_paginated_tasks(*args, **kwargs) def get_views(self): """Get all views related to the project Returns ------- list List of view dicts The each dict contains the following fields: id: int View ID project: int Project ID user: int User ID who created this tab data: dict Filters, orderings and other visual settings """ response = self.make_request("GET", f"/api/dm/views?project={self.id}") return response.json() def create_view(self, filters, ordering=None, title="Tasks"): """Create view Parameters ---------- filters: dict Specify the filters(`label_studio_sdk.data_manager.Filters`) of the view ordering: list of label_studio_sdk.data_manager.Column List with <b>one</b> string representing Data Manager ordering. Use `label_studio_sdk.data_manager.Column` helper class. Example: ```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order title: str Tab name Returns ------- dict: dict with created view """ data = { "project": self.id, "data": {"title": title, "ordering": ordering, "filters": filters}, } response = self.make_request("POST", "/api/dm/views", json=data) return response.json() def delete_view(self, view_id): """Delete view Parameters ---------- view_id: int View ID Returns ------- dict: dict with deleted view """ response = self.make_request("DELETE", f"/api/dm/views/{view_id}") return @property def tasks(self): """Retrieve all tasks from the project. This call can be very slow if the project has a lot of tasks.""" return self.get_tasks() @property def tasks_ids(self): """IDs for all tasks for a project. This call can be very slow if the project has lots of tasks.""" return self.get_tasks_ids() def get_labeled_tasks(self, only_ids=False): """Retrieve all tasks that have been completed, i.e. where requested number of annotations have been created Parameters ---------- only_ids: bool Return only task IDs. Returns ------- list List of task dicts, the same as in `get_tasks`. """ return self.get_tasks( filters={ "conjunction": "and", "items": [ { "filter": "filter:tasks:completed_at", "operator": "empty", "value": False, "type": "Datetime", } ], }, only_ids=only_ids, ) def get_labeled_tasks_ids(self): """Retrieve all task IDs for completed tasks, i.e. where requested number of annotations have been created Returns ------- list List of task IDs """ return self.get_labeled_tasks(only_ids=True) def get_unlabeled_tasks(self, only_ids=False): """Retrieve all tasks that are <b>not</b> completed. If using Label Studio Enterprise, this can include tasks that have been labeled one or more times, but not the full number of times defined in the project labeling settings. Parameters ---------- only_ids: bool Return only task IDs Returns ------- list List of task dicts, the same as in `get_tasks`. """ return self.get_tasks( filters={ "conjunction": "and", "items": [ { "filter": "filter:tasks:completed_at", "operator": "empty", "value": True, "type": "Datetime", } ], }, only_ids=only_ids, ) def get_unlabeled_tasks_ids(self): """Retrieve all task IDs for tasks that are <b>not</b> completed. If using Label Studio Enterprise, this can include tasks that have been labeled one or more times, but not the full number of times defined in the project labeling settings. Returns ------- list List of task IDs """ return self.get_unlabeled_tasks(only_ids=True) def get_task(self, task_id): """Get specific task by ID. Parameters ---------- task_id: int Task ID you want to retrieve Returns ------- dict: dict of task data containing all initial data and annotation results in [Label Studio JSON format](https://labelstud.io/guide/tasks.html#Basic-Label-Studio-JSON-format) ``` id: int Task ID predictions: dict Predictions object annotations: dict Annotations object drafts: dict Drafts object data: object User imported or uploaded data for a task. Data is formatted according to the project label config. meta: object Meta is user imported (uploaded) data and can be useful as input for an ML Backend for embeddings, advanced vectors, and other info. It is passed to ML during training/predicting steps. (Deprecated) created_at: str Date time string representing the time a task was created. updated_at: str Date time string representing the last time a task was updated. is_labeled: bool True if the number of annotations for this task is greater than or equal to the number of maximum_completions for the project. overlap: int Number of distinct annotators that processed the current task. project: int Project ID for this task file_upload: str Uploaded file used as data source for this task ``` """ response = self.make_request("GET", f"/api/tasks/{task_id}") return response.json() def update_task(self, task_id, **kwargs): """Update specific task by ID. Parameters ---------- task_id: int Task ID you want to update kwargs: kwargs parameters List of parameters to update. Check all available parameters [here](https://labelstud.io/api#operation/api_tasks_partial_update) Returns ------- dict: Dict with updated task """ response = self.make_request("PATCH", f"/api/tasks/{task_id}", json=kwargs) response.raise_for_status() return response.json() def create_prediction( self, task_id: int, result: Optional[Union[List[Dict], Dict, str]] = None, score: Optional[float] = 0, model_version: Optional[str] = None, ): """Create a prediction for a specific task. Parameters ---------- task_id: int Task ID result: list or dict or str Result in the <a href="https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks"> Label Studio JSON format as for annotations</a>. For the labeling config: <View> <Image name="image" value="$value"/> <Choices name="class_name" toName="image"> <Choice value="Class A"/> <Choice value="Class B"/> </Choices> </View> The following inputs are equivalent, result could be either full `"predictions"`: [{ "from_name": "class_name", "to_name": "image", "type": "choices", "value": { "choices": ["Class A"] } }] or just `"value"` payload {"choices": ["Class A"]} or just the class name: "Class A" score: float Model prediction score model_version: str Any string identifying your model """ data = {"task": task_id, "result": result, "score": score} if model_version is not None: data["model_version"] = model_version response = self.make_request("POST", "/api/predictions", json=data) json = response.json() logger.debug(f"Response: {json}") return json def create_predictions(self, predictions): """Bulk create predictions for tasks. See <a href="https://labelstud.io/guide/predictions.html">more details about pre-annotated tasks</a>. Parameters ---------- predictions: list of dicts List of dicts with predictions in the <a href="https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks"> Label Studio JSON format as for annotations</a>. """ response = self.make_request( "POST", f"/api/projects/{self.id}/import/predictions", json=predictions ) return response.json() def create_annotations_from_predictions(self, model_versions=None): """Create annotations from all predictions that exist for project tasks from specific ML model versions. Parameters ---------- model_versions: list or None Convert predictions with these model versions to annotations. If `None`, all existing model versions are used Returns ------- dict Dict with counter of created predictions """ payload = { "filters": {"conjunction": "and", "items": []}, "model_version": model_versions, "ordering": [], "project": self.id, "selectedItems": {"all": True, "excluded": []}, } response = self.make_request( "POST", "/api/dm/actions", params={"id": "predictions_to_annotations", "project": self.id}, json=payload, ) return response.json() def list_annotations(self, task_id: int) -> List: """List all annotations for a task. Parameters ---------- task_id: int Task ID Returns ------- list of dict: List of annotations objects """ response = self.make_request("GET", f"/api/tasks/{task_id}/annotations") response.raise_for_status() return response.json() def create_annotation(self, task_id: int, **kwargs) -> Dict: """Add annotations to a task like an annotator does. Parameters ---------- task_id: int Task ID you want to update kwargs: kwargs parameters List of parameters to create. Check all available parameters [here](https://labelstud.io/api#operation/api_tasks_annotations_create). Labeling is stored in the `result` field as a list of dicionaries, [{...}, {...}, ...] Returns ------- dict: Dict with created annotation """ response = self.make_request( "POST", f"/api/tasks/{task_id}/annotations/", json=kwargs ) response.raise_for_status() return response.json() def get_annotation(self, annotation_id: int) -> dict: """Retrieve a specific annotation for a task using the annotation ID. Parameters ---------- annotation_id: int A unique integer value identifying this annotation. Returns ---------- dict Retreived annotation object """ response = self.make_request("GET", f"/api/annotations/{annotation_id}") response.raise_for_status() return response.json() def update_annotation(self, annotation_id, **kwargs): """Update specific annotation with new annotation parameters, e.g. ``` project.update_annotation(annotation_id=123, ground_truth=True) ``` Parameters ---------- annotation_id: int Existing annotation ID from current project. Could be retrieved from `project.get_tasks()` response kwargs: kwargs parameters List of annotation parameters. Check all available parameters [here](https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks) Returns ------- dict Dict with updated annotation """ response = self.make_request( "PATCH", f"/api/annotations/{annotation_id}", json=kwargs ) response.raise_for_status() return response.json() def delete_annotation(self, annotation_id: int) -> int: """Delete an annotation using the annotation ID. This action can't be undone! Parameters ---------- annotation_id: int A unique integer value identifying this annotation. Returns ---------- int Status code for operation """ response = self.make_request("DELETE", f"/api/annotations/{annotation_id}") response.raise_for_status() return response.status_code def get_predictions_coverage(self): """Prediction coverage stats for all model versions for the project. Returns ------- dict Example: { "2021-01-01": 0.9, "2021-02-01": 0.7 } `0.9` means that 90% of project tasks is covered by predictions with model_version `"2021-01-01"` """ model_versions = self.get_model_versions() params = self.get_params() tasks_number = params["task_number"] coverage = { model_version: count / tasks_number for model_version, count in model_versions.items() } return coverage def get_predictions_conflict(self): raise NotImplementedError def get_predictions_precision(self): raise NotImplementedError def connect_google_import_storage( self, bucket: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, google_application_credentials: Optional[str] = None, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = "", description: Optional[str] = "", ): """Connect a Google Cloud Storage (GCS) bucket to Label Studio to use as source storage and import tasks. Parameters ---------- bucket: string Specify the name of the GCS bucket prefix: string Optional, specify the prefix or folder within the GCS bucket with your data regex_filter: string Optional, specify a regex filter to use to match the file types of your data use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. google_application_credentials: string Optional, provide a file with your Google application credentials. If not specified, it will use path stored in `GOOGLE_APPLICATION_CREDENTIALS` environmental variable. Read more about [Google Cloud authentication](https://cloud.google.com/docs/authentication/getting-started) presign: bool Optional, true by default. Specify whether or not to create presigned URLs. presign_ttl: int Optional, 1 by default. Specify how long to keep presigned URLs active. title: string Optional, specify a title for your GCS import storage that appears in Label Studio. description: string Optional, specify a description for your GCS import storage. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ if google_application_credentials and os.path.isfile( google_application_credentials ): with open(google_application_credentials) as f: google_application_credentials = f.read() payload = { "bucket": bucket, "project": self.id, "prefix": prefix, "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "google_application_credentials": google_application_credentials, "presign": presign, "presign_ttl": presign_ttl, "title": title, "description": description, } response = self.make_request("POST", "/api/storages/gcs", json=payload) return response.json() def connect_google_export_storage( self, bucket: str, prefix: Optional[str] = None, google_application_credentials: Optional[str] = None, title: Optional[str] = "", description: Optional[str] = "", can_delete_objects: bool = False, ): """Connect a Google Cloud Storage (GCS) bucket to Label Studio to use as target storage and export tasks. Parameters ---------- bucket: string Specify the name of the GCS bucket prefix: string Optional, specify the prefix or folder within the GCS bucket to export your data to google_application_credentials: string Optional, provide a file with your Google application credentials. If not specified, it will use path stored in `GOOGLE_APPLICATION_CREDENTIALS` environmental variable. Read more about [Google Cloud authentication](https://cloud.google.com/docs/authentication/getting-started) title: string Optional, specify a title for your GCS export storage that appears in Label Studio. description: string Optional, specify a description for your GCS export storage. can_delete_objects: bool False by default. Specify whether to delete tasks in the GCS bucket if they are deleted in Label Studio. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ if os.path.isfile(google_application_credentials): with open(google_application_credentials) as f: google_application_credentials = f.read() payload = { "bucket": bucket, "prefix": prefix, "google_application_credentials": google_application_credentials, "title": title, "description": description, "can_delete_objects": can_delete_objects, "project": self.id, } response = self.make_request("POST", "/api/storages/export/gcs", json=payload) return response.json() def connect_s3_import_storage( self, bucket: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = "", description: Optional[str] = "", aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, recursive_scan: Optional[bool] = False, ): """Connect an Amazon S3 bucket to Label Studio to use as source storage and import tasks. Parameters ---------- bucket: string Specify the name of the S3 bucket. prefix: string Optional, specify the prefix within the S3 bucket to import your data from. regex_filter: string Optional, specify a regex filter to use to match the file types of your data. use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. presign: bool Optional, true by default. Specify whether or not to create presigned URLs. presign_ttl: int Optional, 1 by default. Specify how long to keep presigned URLs active. title: string Optional, specify a title for your S3 import storage that appears in Label Studio. description: string Optional, specify a description for your S3 import storage. aws_access_key_id: string Optional, specify the access key ID for your bucket. aws_secret_access_key: string Optional, specify the secret access key for your bucket. aws_session_token: string Optional, specify a session token to use to access your bucket. region_name: string Optional, specify the AWS region of your S3 bucket. s3_endpoint: string Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method. recursive_scan: bool Optional, specify whether to perform recursive scan over the bucket content. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ payload = { "bucket": bucket, "prefix": prefix, "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "aws_access_key_id": aws_access_key_id, "aws_secret_access_key": aws_secret_access_key, "aws_session_token": aws_session_token, "region_name": region_name, "s3_endpoint": s3_endpoint, "presign": presign, "presign_ttl": presign_ttl, "title": title, "description": description, "project": self.id, "recursive_scan": recursive_scan, } response = self.make_request("POST", "/api/storages/s3", json=payload) return response.json() def connect_s3s_iam_import_storage( self, role_arn: str, external_id: Optional[str] = None, bucket: Optional[str] = None, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = "", description: Optional[str] = "", region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, recursive_scan: Optional[bool] = False, aws_sse_kms_key_id: Optional[str] = None, ): """Create S3 secured import storage with IAM role access. Enterprise only. Parameters ---------- role_arn: string Required, specify the AWS Role ARN to assume. external_id: string or None Optional, specify the external ID to use to assume the role. If None, SDK will call api/organizations/<id> and use external_id from the response. You can find this ID on the organization page in the Label Studio UI. bucket: string Specify the name of the S3 bucket. prefix: string Optional, specify the prefix within the S3 bucket to import your data from. regex_filter: string Optional, specify a regex filter to use to match the file types of your data. use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. presign: bool Optional, true by default. Specify whether or not to create presigned URLs. presign_ttl: int Optional, 1 by default. Specify how long to keep presigned URLs active. title: string Optional, specify a title for your S3 import storage that appears in Label Studio. description: string Optional, specify a description for your S3 import storage. region_name: string Optional, specify the AWS region of your S3 bucket. s3_endpoint: string Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method. recursive_scan: bool Optional, specify whether to perform recursive scan over the bucket content. aws_sse_kms_key_id: string Optional, specify an AWS SSE KMS Key ID for server-side encryption. synchronizable, last_sync, last_sync_count, last_sync_job, status, traceback, meta: Parameters for synchronization details and storage status. Returns ------- dict: containing the response from the API including storage ID and type, among other details. """ if external_id is None: organization = self.get_organization() external_id = organization["external_id"] payload = { "bucket": bucket, "prefix": prefix, "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "presign": presign, "presign_ttl": presign_ttl, "title": title, "description": description, "recursive_scan": recursive_scan, "role_arn": role_arn, "region_name": region_name, "s3_endpoint": s3_endpoint, "aws_sse_kms_key_id": aws_sse_kms_key_id, "project": self.id, "external_id": external_id, } response = self.make_request("POST", "/api/storages/s3s/", json=payload) return response.json() def connect_s3_export_storage( self, bucket: str, prefix: Optional[str] = None, title: Optional[str] = "", description: Optional[str] = "", aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, can_delete_objects: bool = False, ): """Connect an Amazon S3 bucket to Label Studio to use as target storage and export tasks. Parameters ---------- bucket: string Specify the name of the S3 bucket. prefix: string Optional, specify the prefix or folder within the S3 bucket to export your data to. title: string Optional, specify a title for your S3 export storage that appears in Label Studio. description: string Optional, specify a description for your S3 export storage. aws_access_key_id: string Optional, specify the access key ID for your bucket. aws_secret_access_key: string Optional, specify the secret access key for your bucket. aws_session_token: string Optional, specify a session token to use to access your bucket. region_name: string Optional, specify the AWS region of your S3 bucket. s3_endpoint: string Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method. can_delete_objects: bool False by default. Specify whether to delete tasks in the S3 bucket if they are deleted in Label Studio. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ payload = { "bucket": bucket, "prefix": prefix, "aws_access_key_id": aws_access_key_id, "aws_secret_access_key": aws_secret_access_key, "aws_session_token": aws_session_token, "region_name": region_name, "s3_endpoint": s3_endpoint, "title": title, "description": description, "can_delete_objects": can_delete_objects, "project": self.id, } response = self.make_request("POST", "/api/storages/export/s3", json=payload) return response.json() def connect_azure_import_storage( self, container: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = "", description: Optional[str] = "", account_name: Optional[str] = None, account_key: Optional[str] = None, ): """Connect a Microsoft Azure BLOB storage container to Label Studio to use as source storage and import tasks. Parameters ---------- container: string Specify the name of the Azure container. prefix: string Optional, specify the prefix or folder within the Azure container with your data. regex_filter: string Optional, specify a regex filter to use to match the file types of your data. use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. presign: bool Optional, true by default. Specify whether or not to create presigned URLs. presign_ttl: int Optional, 1 by default. Specify how long to keep presigned URLs active. title: string Optional, specify a title for your Azure import storage that appears in Label Studio. description: string Optional, specify a description for your Azure import storage. account_name: string Optional, specify the name of the account with access to the container. account_key: string Optional, specify the key for the account with access to the container. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ payload = { "container": container, "prefix": prefix, "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "account_name": account_name, "account_key": account_key, "presign": presign, "presign_ttl": presign_ttl, "title": title, "description": description, "project": self.id, } response = self.make_request("POST", "/api/storages/azure", json=payload) return response.json() def connect_azure_export_storage( self, container: str, prefix: Optional[str] = None, title: Optional[str] = "", description: Optional[str] = "", account_name: Optional[str] = None, account_key: Optional[str] = None, can_delete_objects: bool = False, ): """Connect Microsoft Azure BLOB storage to Label Studio to use as target storage and export tasks. Parameters ---------- container: string Specify the name of the Azure storage container. prefix: string Optional, specify the prefix or folder within the Azure container to export your data to. title: string Optional, specify a title for your Azure export storage that appears in Label Studio. description: string Optional, specify a description for your Azure export storage. can_delete_objects: bool False by default. Specify whether to delete tasks in the Azure container if they are deleted in Label Studio. account_name: string Optional, specify the name of the account with access to the container. account_key: string Optional, specify the key for the account with access to the container. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ payload = { "container": container, "prefix": prefix, "account_name": account_name, "account_key": account_key, "title": title, "description": description, "can_delete_objects": can_delete_objects, "project": self.id, } response = self.make_request("POST", "/api/storages/export/azure", json=payload) return response.json() def connect_local_import_storage( self, local_store_path: [str], regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, title: Optional[str] = "", description: Optional[str] = "", ): """Connect a Local storage to Label Studio to use as source storage and import tasks. Parameters ---------- local_store_path: string Path to declare as local storage. regex_filter: string Optional, specify a regex filter to use to match the file types of your data use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. title: string Optional, specify a title for your GCS import storage that appears in Label Studio. description: string Optional, specify a description for your GCS import storage. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ if "LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT" not in os.environ: raise ValueError( "To use connect_local_import_storage() you should set " "LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT environment variable, " "read more: https://labelstud.io/guide/storage.html#Prerequisites-2" ) root = os.environ["LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT"] if not os.path.isdir(local_store_path): raise ValueError(f"{local_store_path} is not a directory") if (Path(root) in Path(local_store_path).parents) is False: raise ValueError( f"{str(Path(root))} is not presented in local_store_path parents: " f"{str(Path(local_store_path).parents)}" ) payload = { "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "path": local_store_path, "presign": False, "presign_ttl": 1, "title": title, "description": description, "project": self.id, } response = self.make_request( "POST", f"/api/storages/localfiles?project={self.id}", json=payload ) return response.json() def sync_import_storage(self, storage_type, storage_id): """Synchronize Import (Source) Cloud Storage. Parameters ---------- storage_type: string Specify the type of the storage container. See ProjectStorage for available types. storage_id: int Specify the storage ID of the storage container. See get_import_storages() to get ids. Returns ------- dict: containing the same fields as in the original storage request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ # originally syn was implemented in Client class, keep it for compatibility response = self.make_request( "POST", f"/api/storages/{storage_type}/{str(storage_id)}/sync" ) return response.json() # write func for syn export storage def sync_export_storage(self, storage_type, storage_id): """Synchronize Export (Target) Cloud Storage. Parameters ---------- storage_type: string Specify the type of the storage container. See ProjectStorage for available types. storage_id: int Specify the storage ID of the storage container. See get_export_storages() to get ids. Returns ------- dict: containing the same fields as in the original storage request and: id: int Storage ID type: str Type of storage created_at: str Creation time other fields: See more https://api.labelstud.io/#tag/Storage:S3/operation/api_storages_export_s3_sync_create """ response = self.make_request( "POST", f"/api/storages/export/{storage_type}/{str(storage_id)}/sync" ) return response.json() # write code for get_import_storages() def get_import_storages(self): """Get Import (Source) Cloud Storage. Returns ------- list of dicts: List of dicts with source storages, each dict consists of these fields: ------- Each dict consists of these fields: id : int A unique integer value identifying this storage. type : str The type of the storage. Default is "s3". synchronizable : bool Indicates if the storage is synchronizable. Default is True. presign : bool Indicates if the storage is presign. Default is True. last_sync : str or None The last sync finished time. Can be None. last_sync_count : int or None The count of tasks synced last time. Can be None. last_sync_job : str or None The last sync job ID. Can be None. status : str The status of the storage. Can be one of "initialized", "queued", "in_progress", "failed", "completed". traceback : str or None The traceback report for the last failed sync. Can be None. meta : dict or None Meta and debug information about storage processes. Can be None. title : str or None The title of the cloud storage. Can be None. description : str or None The description of the cloud storage. Can be None. created_at : str The creation time of the storage. bucket : str or None The S3 bucket name. Can be None. prefix : str or None The S3 bucket prefix. Can be None. regex_filter : str or None The cloud storage regex for filtering objects. Can be None. use_blob_urls : bool Indicates if objects are interpreted as BLOBs and generate URLs. aws_access_key_id : str or None The AWS_ACCESS_KEY_ID. Can be None. aws_secret_access_key : str or None The AWS_SECRET_ACCESS_KEY. Can be None. aws_session_token : str or None The AWS_SESSION_TOKEN. Can be None. aws_sse_kms_key_id : str or None The AWS SSE KMS Key ID. Can be None. region_name : str or None The AWS Region. Can be None. s3_endpoint : str or None The S3 Endpoint. Can be None. presign_ttl : int The presigned URLs TTL (in minutes). recursive_scan : bool Indicates if a recursive scan over the bucket content is performed. glob_pattern : str or None The glob pattern for syncing from bucket. Can be None. synced : bool Flag indicating if the dataset has been previously synced or not. """ response = self.make_request("GET", f"/api/storages/?project={self.id}") return response.json() def get_export_storages(self): """Get Export (Target) Cloud Storage. Returns ------- list of dicts: List of dicts with target storages ------- Each dict consists of these fields: id : int A unique integer value identifying this storage. type : str The type of the storage. Default is "s3". synchronizable : bool Indicates if the storage is synchronizable. Default is True. last_sync : str or None The last sync finished time. Can be None. last_sync_count : int or None The count of tasks synced last time. Can be None. last_sync_job : str or None The last sync job ID. Can be None. status : str The status of the storage. Can be one of "initialized", "queued", "in_progress", "failed", "completed". traceback : str or None The traceback report for the last failed sync. Can be None. meta : dict or None Meta and debug information about storage processes. Can be None. title : str or None The title of the cloud storage. Can be None. description : str or None The description of the cloud storage. Can be None. created_at : str The creation time of the storage. can_delete_objects : bool or None Deletion from storage enabled. Can be None. bucket : str or None The S3 bucket name. Can be None. prefix : str or None The S3 bucket prefix. Can be None. regex_filter : str or None The cloud storage regex for filtering objects. Can be None. use_blob_urls : bool Indicates if objects are interpreted as BLOBs and generate URLs. aws_access_key_id : str or None The AWS_ACCESS_KEY_ID. Can be None. aws_secret_access_key : str or None The AWS_SECRET_ACCESS_KEY. Can be None. aws_session_token : str or None The AWS_SESSION_TOKEN. Can be None. aws_sse_kms_key_id : str or None The AWS SSE KMS Key ID. Can be None. region_name : str or None The AWS Region. Can be None. s3_endpoint : str or None The S3 Endpoint. Can be None. project : int A unique integer value identifying this project. """ response = self.make_request("GET", f"/api/storages/export?project={self.id}") return response.json() def _assign_by_sampling( self, users: List[int], assign_function: Callable, view_id: int = None, method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM, fraction: float = 1.0, overlap: int = 1, ): """ Assigning tasks to Reviewers or Annotators by assign_function with method by fraction from view_id Parameters ---------- users: List[int] users' IDs list assign_function: Callable Function to assign tasks by list of user IDs view_id: int Optional, view ID to filter tasks to assign method: AssignmentSamplingMethod Optional, Assignment method fraction: float Optional, expresses the size of dataset to be assigned overlap: int Optional, expresses the count of assignments for each task Returns ------- list[dict] List of dicts with counter of created assignments """ assert len(users) > 0, "Users list is empty." assert len(users) >= overlap, "Overlap is more than number of users." # check if users are int and not User objects if isinstance(users[0], int): # get users from project project_users = self.get_members() # User objects list users = [user for user in project_users if user.id in users] final_results = [] # Get tasks to assign tasks = self.get_tasks(view_id=view_id, only_ids=True) assert len(tasks) > 0, "Tasks list is empty." # Choice fraction of tasks if fraction != 1.0: k = int(len(tasks) * fraction) tasks = sample(tasks, k) # prepare random list of tasks for overlap > 1 if overlap > 1: shuffle(tasks) tasks = tasks * overlap # Check how many tasks for each user n_tasks = max(int(len(tasks) // len(users)), 1) # Assign each user tasks for user in users: # check if last chunk of tasks is less than average chunk if n_tasks > len(tasks): n_tasks = len(tasks) # check if last chunk of tasks is more than average chunk + 1 # (covers rounding issue in line 1407) elif n_tasks + 1 == len(tasks) and n_tasks != 1: n_tasks = n_tasks + 1 if method == AssignmentSamplingMethod.RANDOM and overlap == 1: sample_tasks = sample(tasks, n_tasks) elif method == AssignmentSamplingMethod.RANDOM and overlap > 1: sample_tasks = tasks[:n_tasks] else: raise ValueError(f"Sampling method {method} is not allowed") final_results.append(assign_function([user], sample_tasks)) if overlap > 1: tasks = tasks[n_tasks:] else: tasks = list(set(tasks) - set(sample_tasks)) if len(tasks) == 0: break # check if any tasks left if len(tasks) > 0: for user in users: if not tasks: break task = tasks.pop() final_results.append(assign_function([user], [task])) return final_results def assign_reviewers_by_sampling( self, users: List[int], view_id: int = None, method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM, fraction: float = 1.0, overlap: int = 1, ): """ Behaves similarly like `assign_reviewers()` but instead of specify tasks_ids explicitely, it gets users' IDs list and optional view ID and uniformly splits all tasks across reviewers Fraction expresses the size of dataset to be assigned Parameters ---------- users: List[int] users' IDs list view_id: int Optional, view ID to filter tasks to assign method: AssignmentSamplingMethod Optional, Assignment method fraction: float Optional, expresses the size of dataset to be assigned overlap: int Optional, expresses the count of assignments for each task Returns ------- list[dict] List of dicts with counter of created assignments """ return self._assign_by_sampling( users=users, assign_function=self.assign_reviewers, view_id=view_id, method=method, fraction=fraction, overlap=overlap, ) def assign_annotators_by_sampling( self, users: List[int], view_id: int = None, method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM, fraction: float = 1.0, overlap: int = 1, ): """ Behaves similarly like `assign_annotators()` but instead of specify tasks_ids explicitly, it gets users' IDs list and optional view ID and splits all tasks across annotators. Fraction expresses the size of dataset to be assigned. Parameters ---------- users: List[int] users' IDs list view_id: int Optional, view ID to filter tasks to assign method: AssignmentSamplingMethod Optional, Assignment method fraction: float Optional, expresses the size of dataset to be assigned overlap: int Optional, expresses the count of assignments for each task Returns ------- list[dict] List of dicts with counter of created assignments """ return self._assign_by_sampling( users=users, assign_function=self.assign_annotators, view_id=view_id, method=method, fraction=fraction, overlap=overlap, ) def export_snapshot_list(self) -> list: """ Get list of export snapshots for the current project ------- Returns ------- list[dict] List of dict with export snapshots with status: id: int Export ID created_at: str Creation time status: str Export status created_by: dict User data finished_at: str Finished time """ response = self.make_request("GET", f"/api/projects/{self.id}/exports") return response.json() def export_snapshot_create( self, title: str, task_filter_options: dict = None, serialization_options_drafts: bool = True, serialization_options_predictions: bool = True, serialization_options_annotations__completed_by: bool = True, annotation_filter_options_usual: bool = True, annotation_filter_options_ground_truth: bool = True, annotation_filter_options_skipped: bool = True, interpolate_key_frames: bool = False, ) -> dict: """ Create new export snapshot ---------- Parameters ---------- title: str Export title task_filter_options: dict Task filter options, use {"view": tab_id} to apply filter from this tab, <a href="https://api.labelstud.io/#operation/api_projects_exports_create">check the API parameters for more details</a> serialization_options_drafts: bool Expand drafts (False) or include only ID (True) serialization_options_predictions: bool Expand predictions (False) or include only ID (True) serialization_options_annotations__completed_by: bool Expand user that completed_by (False) or include only ID (True) annotation_filter_options_usual: bool Include not cancelled and not ground truth annotations annotation_filter_options_ground_truth: bool Filter ground truth annotations annotation_filter_options_skipped: bool Filter skipped annotations interpolate_key_frames: bool Interpolate key frames into sequence Returns ------- dict: containing the same fields as in the request and the created export fields: id: int Export ID created_at: str Creation time status: str Export status created_by: dict User data finished_at: str Finished time """ if task_filter_options is None: task_filter_options = {} payload = { "title": title, "serialization_options": { "drafts": {"only_id": serialization_options_drafts}, "predictions": {"only_id": serialization_options_predictions}, "annotations__completed_by": { "only_id": serialization_options_annotations__completed_by }, "interpolate_key_frames": interpolate_key_frames, }, "task_filter_options": task_filter_options, "annotation_filter_options": { "usual": annotation_filter_options_usual, "ground_truth": annotation_filter_options_ground_truth, "skipped": annotation_filter_options_skipped, }, } response = self.make_request( "POST", f"/api/projects/{self.id}/exports?interpolate_key_frames={interpolate_key_frames}", json=payload, ) return response.json() def export( self, filters=None, title="SDK Export", export_type="JSON", output_dir=".", **kwargs, ): """ Export tasks from the project with optional filters, and save the exported data to a specified directory. This method: (1) creates a temporary view with the specified filters if they are not None, (2) creates a new export snapshot using the view ID, (3) checks the status of the snapshot creation while it's in progress, (4) and downloads the snapshot file in the specified export format. (5) After the export, it cleans up and remove the temporary view. Parameters ---------- filters : data_manager.Filters, dict, optional Filters to apply when exporting tasks. If provided, a temporary view is created with these filters. The format of the filters should match the Label Studio filter options. Default is None, which means all tasks are exported. Use label_studio_sdk.data_manager.Filters.create() to create filters, Example of the filters JSON format: ```json { "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] } ``` titile : str, optional The title of the export snapshot. Default is 'SDK Export'. export_type : str, optional The format of the exported data. It should be one of the formats supported by Label Studio ('JSON', 'CSV', etc.). Default is 'JSON'. output_dir : str, optional The directory where the exported file will be saved. Default is the current directory. kwargs : kwargs, optional The same parameters as in the export_snapshot_create method. Returns ------- dict containing the status of the export, the filename of the exported file, and the export ID. filename : str Path to the downloaded export file status : int 200 is ok export_id : int Export ID, you can retrieve more details about this export using this ID """ # Create a temporary view with the specified filters if filters: view = self.create_view(title="Temp SDK export", filters=filters) task_filter_options = {"view": view["id"]} else: task_filter_options = None view = None # Create a new export snapshot using the view ID export_result = self.export_snapshot_create( title=title, task_filter_options=task_filter_options, **kwargs, ) # Check the status of the snapshot creation export_id = export_result["id"] while self.export_snapshot_status(export_id).is_in_progress(): time.sleep(1.0) # Wait until the snapshot is ready os.makedirs(output_dir, exist_ok=True) # Download the snapshot file once it's ready status, filename = self.export_snapshot_download( export_id, export_type=export_type, path=output_dir ) # Clean up the view if view: self.delete_view(view["id"]) return {"status": status, "filename": filename, "export_id": export_id} def export_snapshot_status(self, export_id: int) -> ExportSnapshotStatus: """ Get export snapshot status by Export ID ---------- Parameters ---------- export_id: int Existing Export ID from current project. Can be referred as id from self.exports() Returns ------- `label_studio_sdk.project.ExportSnapshotStatus` ExportSnapshotStatus.response is dict and contains the following fields: id: int Export ID created_at: str Creation time status: str created, completed, in_progress, failed created_by: dict User data finished_at: str Finished time """ response = self.make_request( "GET", f"/api/projects/{self.id}/exports/{export_id}" ) return ExportSnapshotStatus(response.json()) def export_snapshot_download( self, export_id: int, export_type: str = "JSON", path: str = "." ) -> (int, str): """ Download file with export snapshot in provided format ---------- Parameters ---------- export_id: int Existing Export ID from current project. Can be referred as id from self.exports() export_type: str Default export_type is JSON. Specify another format type as referenced in <a href="https://github.com/heartexlabs/label-studio-converter/blob/master/label_studio_converter/converter.py#L32"> the Label Studio converter code</a>. path: str Default path to store downloaded files Returns ------- Status code for operation and downloaded filename """ response = self.make_request( "GET", f"/api/projects/{self.id}/exports/{export_id}/download?exportType={export_type}", ) filename = None if response.status_code == 200: content_disposition = response.headers.get("Content-Disposition") if content_disposition: filename = content_disposition.split("filename=")[-1].strip("\"'") filename = os.path.basename(filename) else: raise LabelStudioException("No filename in response") with open(os.path.join(path, filename), "wb") as f: for chk in response: f.write(chk) return response.status_code, filename def export_snapshot_delete(self, export_id: int) -> int: """Delete an export file by specified export ID Parameters ---------- export_id: int Existing Export ID from current project Returns ---------- Status code for operation """ response = self.make_request( "DELETE", f"/api/projects/{self.id}/exports/{export_id}" ) return response.status_code def get_files_from_tasks(self, tasks: Dict, get_tasks: bool = False): """Copy files from tasks to cache folder Parameters ---------- tasks: Dict Tasks to download to local storage get_tasks: bool Get all tasks from current project Returns ------- list List of filenames """ if get_tasks: tasks = self.get_tasks() filenames = [] if tasks: for task in tasks: for key in task["data"]: try: filename = get_local_path( task["data"][key], access_token=self.api_key, hostname=self.url, ) filenames.append(filename) except (FileNotFoundError, InvalidSchema, MissingSchema, IOError): logger.debug(f"Couldn't copy file {task['data'][key]}.") return filenames def delete_task(self, task_id: int) -> Response: """Delete a task. To remove multiple tasks `use delete_tasks()`. Parameters ---------- task_id: int Task id. """ assert isinstance(task_id, int), "task_id should be int" return self.make_request("DELETE", f"/api/tasks/{task_id}") def delete_tasks(self, task_ids: list) -> Response: """Delete multiple tasks by IDs. Parameters ---------- task_ids: list of int Task ids. """ assert isinstance(task_ids, list), "task_ids should be list of int" if not task_ids: # avoid deletion of all tasks when task_ids = [] return Response() payload = { "selectedItems": {"all": False, "included": task_ids}, "project": self.id, } return self.make_request( "POST", f"/api/dm/actions?project={self.id}&id=delete_tasks", json=payload ) def delete_all_tasks(self, excluded_ids: list = None) -> Response: """Delete all tasks from the project. Parameters ---------- excluded_ids: list of int Task ids that should be excluded from the deletion. """ assert ( isinstance(excluded_ids, list) or excluded_ids is None ), "excluded_ids should be list of int or None" if excluded_ids is None: excluded_ids = [] payload = { "selectedItems": {"all": True, "excluded": excluded_ids}, "project": self.id, } return self.make_request( "POST", f"/api/dm/actions?project={self.id}&id=delete_tasks", json=payload )静态方法
def get_from_id(client, project_id) ‑> Project-
source code 浏览Git
@classmethod def get_from_id(cls, client, project_id) -> "Project": """Class factory to create a project instance from an existing project ID. Parameters ---------- client: class Client project_id: int Project ID Returns ------- `Project` """ project = cls._create_from_id(client, project_id) project.update_params() return project
实例变量
parsed_label_config-
获取项目的解析标注配置。您可以使用此功能更轻松地根据标注配置构建注释或预测结果。
返回
dict- Object and control tags from the project labeling configuration. Example with structured configuration of the form:
{ "<ControlTag>.name": { "type": "ControlTag", "to_name": ["<ObjectTag1>.name", "<ObjectTag2>.name"], "inputs: [ {"type": "ObjectTag1", "value": "<ObjectTag1>.value"}, {"type": "ObjectTag2", "value": "<ObjectTag2>.value"} ], "labels": ["Label1", "Label2", "Label3"] }"labels"如果存在则取自 "alias" 属性,否则取自 "value"source code 浏览Git
@property def parsed_label_config(self): """Get the parsed labeling configuration for the project. You can use this to more easily construct annotation or prediction results based on your labeling configuration. Returns ------- dict Object and control tags from the project labeling configuration. Example with structured configuration of the form: ``` { "<ControlTag>.name": { "type": "ControlTag", "to_name": ["<ObjectTag1>.name", "<ObjectTag2>.name"], "inputs: [ {"type": "ObjectTag1", "value": "<ObjectTag1>.value"}, {"type": "ObjectTag2", "value": "<ObjectTag2>.value"} ], "labels": ["Label1", "Label2", "Label3"] } ``` `"labels"` are taken from "alias" attribute if it exists, else "value" """ return parse_config(self.label_config) tasks-
从项目中检索所有任务。如果项目包含大量任务,此调用可能会非常缓慢。
source code 浏览Git
@property def tasks(self): """Retrieve all tasks from the project. This call can be very slow if the project has a lot of tasks.""" return self.get_tasks() tasks_ids-
项目中所有任务的ID。如果项目包含大量任务,此调用可能会非常缓慢。
source code 浏览Git
@property def tasks_ids(self): """IDs for all tasks for a project. This call can be very slow if the project has lots of tasks.""" return self.get_tasks_ids()
方法
def add_member(self, user)-
将用户添加到项目中。
参数
user:User
返回
dict- Dict with created member
source code 浏览Git
def add_member(self, user): """Add a user to a project. Parameters ---------- user: User Returns ------- dict Dict with created member """ payload = {"user": user.id} response = self.make_request( "POST", f"/api/projects/{self.id}/members", json=payload ) return response.json() def assign_annotators(self, users, tasks_ids)-
为任务分配标注人员
参数
users:listofuser's objectstasks_ids:listofinteger task IDs to assign users to
返回
dict- Dict with counter of created assignments
source code 浏览Git
def assign_annotators(self, users, tasks_ids): """Assign annotators to tasks Parameters ---------- users: list of user's objects tasks_ids: list of integer task IDs to assign users to Returns ------- dict Dict with counter of created assignments """ final_response = {"assignments": 0} users_ids = [user.id for user in users] # Assign tasks to users with batches for c in chunk(tasks_ids, 1000): logger.debug(f"Starting assignment for: {users_ids}") payload = { "users": users_ids, "selectedItems": {"all": False, "included": c}, "type": "AN", } response = self.make_request( "POST", f"/api/projects/{self.id}/tasks/assignees", json=payload ) final_response["assignments"] += response.json()["assignments"] return final_response def assign_annotators_by_sampling(self, users: List[int], view_id: int = None, method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM, fraction: float = 1.0, overlap: int = 1)-
行为类似于
assign_annotators(),但不是显式指定任务ID, 而是获取用户ID列表和可选的视图ID,并将所有任务分配给标注者。 分数表示要分配的数据集大小。 参数
users:List[int]- users' IDs list
view_id:int- Optional, view ID to filter tasks to assign
method:AssignmentSamplingMethod- Optional, Assignment method
fraction:float- Optional, expresses the size of dataset to be assigned
overlap:int- Optional, expresses the count of assignments for each task
返回
list[dict]- List of dicts with counter of created assignments
source code 浏览Git
def assign_annotators_by_sampling( self, users: List[int], view_id: int = None, method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM, fraction: float = 1.0, overlap: int = 1, ): """ Behaves similarly like `assign_annotators()` but instead of specify tasks_ids explicitly, it gets users' IDs list and optional view ID and splits all tasks across annotators. Fraction expresses the size of dataset to be assigned. Parameters ---------- users: List[int] users' IDs list view_id: int Optional, view ID to filter tasks to assign method: AssignmentSamplingMethod Optional, Assignment method fraction: float Optional, expresses the size of dataset to be assigned overlap: int Optional, expresses the count of assignments for each task Returns ------- list[dict] List of dicts with counter of created assignments """ return self._assign_by_sampling( users=users, assign_function=self.assign_annotators, view_id=view_id, method=method, fraction=fraction, overlap=overlap, ) def assign_reviewers(self, users, tasks_ids)-
分配审阅者给任务
参数
users:listofuser's objectstasks_ids:listofinteger task IDs to assign reviewers to
返回
dict- Dict with counter of created assignments
source code 浏览Git
def assign_reviewers(self, users, tasks_ids): """Assign reviewers to tasks Parameters ---------- users: list of user's objects tasks_ids: list of integer task IDs to assign reviewers to Returns ------- dict Dict with counter of created assignments """ payload = { "users": [user.id for user in users], "selectedItems": {"all": False, "included": tasks_ids}, "type": "RE", } response = self.make_request( "POST", f"/api/projects/{self.id}/tasks/assignees", json=payload ) return response.json() def assign_reviewers_by_sampling(self, users: List[int], view_id: int = None, method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM, fraction: float = 1.0, overlap: int = 1)-
行为类似于
assign_reviewers(),但不是显式指定任务ID, 而是获取用户ID列表和可选的视图ID,并将所有任务均匀分配给审阅者 分数表示要分配的数据集大小 参数
users:List[int]- users' IDs list
view_id:int- Optional, view ID to filter tasks to assign
method:AssignmentSamplingMethod- Optional, Assignment method
fraction:float- Optional, expresses the size of dataset to be assigned
overlap:int- Optional, expresses the count of assignments for each task
返回
list[dict]- List of dicts with counter of created assignments
source code 浏览Git
def assign_reviewers_by_sampling( self, users: List[int], view_id: int = None, method: AssignmentSamplingMethod = AssignmentSamplingMethod.RANDOM, fraction: float = 1.0, overlap: int = 1, ): """ Behaves similarly like `assign_reviewers()` but instead of specify tasks_ids explicitely, it gets users' IDs list and optional view ID and uniformly splits all tasks across reviewers Fraction expresses the size of dataset to be assigned Parameters ---------- users: List[int] users' IDs list view_id: int Optional, view ID to filter tasks to assign method: AssignmentSamplingMethod Optional, Assignment method fraction: float Optional, expresses the size of dataset to be assigned overlap: int Optional, expresses the count of assignments for each task Returns ------- list[dict] List of dicts with counter of created assignments """ return self._assign_by_sampling( users=users, assign_function=self.assign_reviewers, view_id=view_id, method=method, fraction=fraction, overlap=overlap, ) def connect_azure_export_storage(self, container: str, prefix: Optional[str] = None, title: Optional[str] = '', description: Optional[str] = '', account_name: Optional[str] = None, account_key: Optional[str] = None, can_delete_objects: bool = False)-
将Microsoft Azure BLOB存储连接到Label Studio,用作目标存储并导出任务。
参数
container:string- Specify the name of the Azure storage container.
prefix:string- Optional, specify the prefix or folder within the Azure container to export your data to.
title:string- Optional, specify a title for your Azure export storage that appears in Label Studio.
description:string- Optional, specify a description for your Azure export storage.
can_delete_objects:bool- False by default. Specify whether to delete tasks in the Azure container if they are deleted in Label Studio.
account_name:string- Optional, specify the name of the account with access to the container.
account_key:string- Optional, specify the key for the account with access to the container.
返回
dict:- containing the same fields as in the request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
last_sync:str- Time last sync finished, can be empty.
last_sync_count:int- Number of tasks synced in the last sync
source code 浏览Git
def connect_azure_export_storage( self, container: str, prefix: Optional[str] = None, title: Optional[str] = "", description: Optional[str] = "", account_name: Optional[str] = None, account_key: Optional[str] = None, can_delete_objects: bool = False, ): """Connect Microsoft Azure BLOB storage to Label Studio to use as target storage and export tasks. Parameters ---------- container: string Specify the name of the Azure storage container. prefix: string Optional, specify the prefix or folder within the Azure container to export your data to. title: string Optional, specify a title for your Azure export storage that appears in Label Studio. description: string Optional, specify a description for your Azure export storage. can_delete_objects: bool False by default. Specify whether to delete tasks in the Azure container if they are deleted in Label Studio. account_name: string Optional, specify the name of the account with access to the container. account_key: string Optional, specify the key for the account with access to the container. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ payload = { "container": container, "prefix": prefix, "account_name": account_name, "account_key": account_key, "title": title, "description": description, "can_delete_objects": can_delete_objects, "project": self.id, } response = self.make_request("POST", "/api/storages/export/azure", json=payload) return response.json() def connect_azure_import_storage(self, container: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = '', description: Optional[str] = '', account_name: Optional[str] = None, account_key: Optional[str] = None)-
将Microsoft Azure BLOB存储容器连接到Label Studio,用作源存储并导入任务。
参数
container:string- Specify the name of the Azure container.
prefix:string- Optional, specify the prefix or folder within the Azure container with your data.
regex_filter:string- Optional, specify a regex filter to use to match the file types of your data.
use_blob_urls:bool- Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
presign:bool- Optional, true by default. Specify whether or not to create presigned URLs.
presign_ttl:int- Optional, 1 by default. Specify how long to keep presigned URLs active.
title:string- Optional, specify a title for your Azure import storage that appears in Label Studio.
description:string- Optional, specify a description for your Azure import storage.
account_name:string- Optional, specify the name of the account with access to the container.
account_key:string- Optional, specify the key for the account with access to the container.
返回
dict:- containing the same fields as in the request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
last_sync:str- Time last sync finished, can be empty.
last_sync_count:int- Number of tasks synced in the last sync
source code 浏览Git
def connect_azure_import_storage( self, container: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = "", description: Optional[str] = "", account_name: Optional[str] = None, account_key: Optional[str] = None, ): """Connect a Microsoft Azure BLOB storage container to Label Studio to use as source storage and import tasks. Parameters ---------- container: string Specify the name of the Azure container. prefix: string Optional, specify the prefix or folder within the Azure container with your data. regex_filter: string Optional, specify a regex filter to use to match the file types of your data. use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. presign: bool Optional, true by default. Specify whether or not to create presigned URLs. presign_ttl: int Optional, 1 by default. Specify how long to keep presigned URLs active. title: string Optional, specify a title for your Azure import storage that appears in Label Studio. description: string Optional, specify a description for your Azure import storage. account_name: string Optional, specify the name of the account with access to the container. account_key: string Optional, specify the key for the account with access to the container. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ payload = { "container": container, "prefix": prefix, "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "account_name": account_name, "account_key": account_key, "presign": presign, "presign_ttl": presign_ttl, "title": title, "description": description, "project": self.id, } response = self.make_request("POST", "/api/storages/azure", json=payload) return response.json() def connect_google_export_storage(self, bucket: str, prefix: Optional[str] = None, google_application_credentials: Optional[str] = None, title: Optional[str] = '', description: Optional[str] = '', can_delete_objects: bool = False)-
将Google云存储(GCS)存储桶连接到Label Studio,用作目标存储并导出任务。
参数
bucket:string- Specify the name of the GCS bucket
prefix:string- Optional, specify the prefix or folder within the GCS bucket to export your data to
google_application_credentials:string- Optional, provide a file with your Google application credentials. If not specified, it will use path stored in
GOOGLE_APPLICATION_CREDENTIALSenvironmental variable. Read more about Google Cloud 身份验证 title:string- Optional, specify a title for your GCS export storage that appears in Label Studio.
description:string- Optional, specify a description for your GCS export storage.
can_delete_objects:bool- False by default. Specify whether to delete tasks in the GCS bucket if they are deleted in Label Studio.
返回
dict:- containing the same fields as in the request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
last_sync:str- Time last sync finished, can be empty.
last_sync_count:int- Number of tasks synced in the last sync
source code 浏览Git
def connect_google_export_storage( self, bucket: str, prefix: Optional[str] = None, google_application_credentials: Optional[str] = None, title: Optional[str] = "", description: Optional[str] = "", can_delete_objects: bool = False, ): """Connect a Google Cloud Storage (GCS) bucket to Label Studio to use as target storage and export tasks. Parameters ---------- bucket: string Specify the name of the GCS bucket prefix: string Optional, specify the prefix or folder within the GCS bucket to export your data to google_application_credentials: string Optional, provide a file with your Google application credentials. If not specified, it will use path stored in `GOOGLE_APPLICATION_CREDENTIALS` environmental variable. Read more about [Google Cloud authentication](https://cloud.google.com/docs/authentication/getting-started) title: string Optional, specify a title for your GCS export storage that appears in Label Studio. description: string Optional, specify a description for your GCS export storage. can_delete_objects: bool False by default. Specify whether to delete tasks in the GCS bucket if they are deleted in Label Studio. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ if os.path.isfile(google_application_credentials): with open(google_application_credentials) as f: google_application_credentials = f.read() payload = { "bucket": bucket, "prefix": prefix, "google_application_credentials": google_application_credentials, "title": title, "description": description, "can_delete_objects": can_delete_objects, "project": self.id, } response = self.make_request("POST", "/api/storages/export/gcs", json=payload) return response.json() def connect_google_import_storage(self, bucket: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, google_application_credentials: Optional[str] = None, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = '', description: Optional[str] = '')-
将Google云存储(GCS)存储桶连接到Label Studio,用作源存储并导入任务。
参数
bucket:string- Specify the name of the GCS bucket
prefix:string- Optional, specify the prefix or folder within the GCS bucket with your data
regex_filter:string- Optional, specify a regex filter to use to match the file types of your data
use_blob_urls:bool- Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
google_application_credentials:string- Optional, provide a file with your Google application credentials. If not specified, it will use path stored in
GOOGLE_APPLICATION_CREDENTIALSenvironmental variable. Read more about Google Cloud 身份验证 presign:bool- Optional, true by default. Specify whether or not to create presigned URLs.
presign_ttl:int- Optional, 1 by default. Specify how long to keep presigned URLs active.
title:string- Optional, specify a title for your GCS import storage that appears in Label Studio.
description:string- Optional, specify a description for your GCS import storage.
返回
dict:- containing the same fields as in the request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
last_sync:str- Time last sync finished, can be empty.
last_sync_count:int- Number of tasks synced in the last sync
source code 浏览Git
def connect_google_import_storage( self, bucket: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, google_application_credentials: Optional[str] = None, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = "", description: Optional[str] = "", ): """Connect a Google Cloud Storage (GCS) bucket to Label Studio to use as source storage and import tasks. Parameters ---------- bucket: string Specify the name of the GCS bucket prefix: string Optional, specify the prefix or folder within the GCS bucket with your data regex_filter: string Optional, specify a regex filter to use to match the file types of your data use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. google_application_credentials: string Optional, provide a file with your Google application credentials. If not specified, it will use path stored in `GOOGLE_APPLICATION_CREDENTIALS` environmental variable. Read more about [Google Cloud authentication](https://cloud.google.com/docs/authentication/getting-started) presign: bool Optional, true by default. Specify whether or not to create presigned URLs. presign_ttl: int Optional, 1 by default. Specify how long to keep presigned URLs active. title: string Optional, specify a title for your GCS import storage that appears in Label Studio. description: string Optional, specify a description for your GCS import storage. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ if google_application_credentials and os.path.isfile( google_application_credentials ): with open(google_application_credentials) as f: google_application_credentials = f.read() payload = { "bucket": bucket, "project": self.id, "prefix": prefix, "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "google_application_credentials": google_application_credentials, "presign": presign, "presign_ttl": presign_ttl, "title": title, "description": description, } response = self.make_request("POST", "/api/storages/gcs", json=payload) return response.json() def connect_local_import_storage(self, local_store_path: [], regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, title: Optional[str] = '', description: Optional[str] = '') -
将本地存储连接到Label Studio,用作源存储并导入任务。 参数
local_store_path:string- Path to declare as local storage.
regex_filter:string- Optional, specify a regex filter to use to match the file types of your data
use_blob_urls:bool- Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
title:string- Optional, specify a title for your GCS import storage that appears in Label Studio.
description:string- Optional, specify a description for your GCS import storage.
返回
dict:- containing the same fields as in the request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
last_sync:str- Time last sync finished, can be empty.
last_sync_count:int- Number of tasks synced in the last sync
source code 浏览Git
def connect_local_import_storage( self, local_store_path: [str], regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, title: Optional[str] = "", description: Optional[str] = "", ): """Connect a Local storage to Label Studio to use as source storage and import tasks. Parameters ---------- local_store_path: string Path to declare as local storage. regex_filter: string Optional, specify a regex filter to use to match the file types of your data use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. title: string Optional, specify a title for your GCS import storage that appears in Label Studio. description: string Optional, specify a description for your GCS import storage. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ if "LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT" not in os.environ: raise ValueError( "To use connect_local_import_storage() you should set " "LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT environment variable, " "read more: https://labelstud.io/guide/storage.html#Prerequisites-2" ) root = os.environ["LABEL_STUDIO_LOCAL_FILES_DOCUMENT_ROOT"] if not os.path.isdir(local_store_path): raise ValueError(f"{local_store_path} is not a directory") if (Path(root) in Path(local_store_path).parents) is False: raise ValueError( f"{str(Path(root))} is not presented in local_store_path parents: " f"{str(Path(local_store_path).parents)}" ) payload = { "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "path": local_store_path, "presign": False, "presign_ttl": 1, "title": title, "description": description, "project": self.id, } response = self.make_request( "POST", f"/api/storages/localfiles?project={self.id}", json=payload ) return response.json() def connect_s3_export_storage(self, bucket: str, prefix: Optional[str] = None, title: Optional[str] = '', description: Optional[str] = '', aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, can_delete_objects: bool = False)-
将Amazon S3存储桶连接到Label Studio,用作目标存储并导出任务。
参数
bucket:string- Specify the name of the S3 bucket.
prefix:string- Optional, specify the prefix or folder within the S3 bucket to export your data to.
title:string- Optional, specify a title for your S3 export storage that appears in Label Studio.
description:string- Optional, specify a description for your S3 export storage.
aws_access_key_id:string- Optional, specify the access key ID for your bucket.
aws_secret_access_key:string- Optional, specify the secret access key for your bucket.
aws_session_token:string- Optional, specify a session token to use to access your bucket.
region_name:string- Optional, specify the AWS region of your S3 bucket.
s3_endpoint:string- Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method.
can_delete_objects:bool- False by default. Specify whether to delete tasks in the S3 bucket if they are deleted in Label Studio.
返回
dict:- containing the same fields as in the request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
last_sync:str- Time last sync finished, can be empty.
last_sync_count:int- Number of tasks synced in the last sync
source code 浏览Git
def connect_s3_export_storage( self, bucket: str, prefix: Optional[str] = None, title: Optional[str] = "", description: Optional[str] = "", aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, can_delete_objects: bool = False, ): """Connect an Amazon S3 bucket to Label Studio to use as target storage and export tasks. Parameters ---------- bucket: string Specify the name of the S3 bucket. prefix: string Optional, specify the prefix or folder within the S3 bucket to export your data to. title: string Optional, specify a title for your S3 export storage that appears in Label Studio. description: string Optional, specify a description for your S3 export storage. aws_access_key_id: string Optional, specify the access key ID for your bucket. aws_secret_access_key: string Optional, specify the secret access key for your bucket. aws_session_token: string Optional, specify a session token to use to access your bucket. region_name: string Optional, specify the AWS region of your S3 bucket. s3_endpoint: string Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method. can_delete_objects: bool False by default. Specify whether to delete tasks in the S3 bucket if they are deleted in Label Studio. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ payload = { "bucket": bucket, "prefix": prefix, "aws_access_key_id": aws_access_key_id, "aws_secret_access_key": aws_secret_access_key, "aws_session_token": aws_session_token, "region_name": region_name, "s3_endpoint": s3_endpoint, "title": title, "description": description, "can_delete_objects": can_delete_objects, "project": self.id, } response = self.make_request("POST", "/api/storages/export/s3", json=payload) return response.json() def connect_s3_import_storage(self, bucket: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = '', description: Optional[str] = '', aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, recursive_scan: Optional[bool] = False)-
将Amazon S3存储桶连接到Label Studio,用作源存储并导入任务。
参数
bucket:string- Specify the name of the S3 bucket.
prefix:string- Optional, specify the prefix within the S3 bucket to import your data from.
regex_filter:string- Optional, specify a regex filter to use to match the file types of your data.
use_blob_urls:bool- Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
presign:bool- Optional, true by default. Specify whether or not to create presigned URLs.
presign_ttl:int- Optional, 1 by default. Specify how long to keep presigned URLs active.
title:string- Optional, specify a title for your S3 import storage that appears in Label Studio.
description:string- Optional, specify a description for your S3 import storage.
aws_access_key_id:string- Optional, specify the access key ID for your bucket.
aws_secret_access_key:string- Optional, specify the secret access key for your bucket.
aws_session_token:string- Optional, specify a session token to use to access your bucket.
region_name:string- Optional, specify the AWS region of your S3 bucket.
s3_endpoint:string- Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method.
recursive_scan:bool- Optional, specify whether to perform recursive scan over the bucket content.
返回
dict:- containing the same fields as in the request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
last_sync:str- Time last sync finished, can be empty.
last_sync_count:int- Number of tasks synced in the last sync
source code 浏览Git
def connect_s3_import_storage( self, bucket: str, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = "", description: Optional[str] = "", aws_access_key_id: Optional[str] = None, aws_secret_access_key: Optional[str] = None, aws_session_token: Optional[str] = None, region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, recursive_scan: Optional[bool] = False, ): """Connect an Amazon S3 bucket to Label Studio to use as source storage and import tasks. Parameters ---------- bucket: string Specify the name of the S3 bucket. prefix: string Optional, specify the prefix within the S3 bucket to import your data from. regex_filter: string Optional, specify a regex filter to use to match the file types of your data. use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. presign: bool Optional, true by default. Specify whether or not to create presigned URLs. presign_ttl: int Optional, 1 by default. Specify how long to keep presigned URLs active. title: string Optional, specify a title for your S3 import storage that appears in Label Studio. description: string Optional, specify a description for your S3 import storage. aws_access_key_id: string Optional, specify the access key ID for your bucket. aws_secret_access_key: string Optional, specify the secret access key for your bucket. aws_session_token: string Optional, specify a session token to use to access your bucket. region_name: string Optional, specify the AWS region of your S3 bucket. s3_endpoint: string Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method. recursive_scan: bool Optional, specify whether to perform recursive scan over the bucket content. Returns ------- dict: containing the same fields as in the request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ payload = { "bucket": bucket, "prefix": prefix, "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "aws_access_key_id": aws_access_key_id, "aws_secret_access_key": aws_secret_access_key, "aws_session_token": aws_session_token, "region_name": region_name, "s3_endpoint": s3_endpoint, "presign": presign, "presign_ttl": presign_ttl, "title": title, "description": description, "project": self.id, "recursive_scan": recursive_scan, } response = self.make_request("POST", "/api/storages/s3", json=payload) return response.json() def connect_s3s_iam_import_storage(self, role_arn: str, external_id: Optional[str] = None, bucket: Optional[str] = None, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = '', description: Optional[str] = '', region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, recursive_scan: Optional[bool] = False, aws_sse_kms_key_id: Optional[str] = None)-
创建具有IAM角色访问权限的S3安全导入存储。仅限企业版。
参数
role_arn:string- Required, specify the AWS Role ARN to assume.
external_id:stringorNone- Optional, specify the external ID to use to assume the role. If None, SDK will call api/organizations/
and use external_id from the response. You can find this ID on the organization page in the Label Studio UI. bucket:string- Specify the name of the S3 bucket.
prefix:string- Optional, specify the prefix within the S3 bucket to import your data from.
regex_filter:string- Optional, specify a regex filter to use to match the file types of your data.
use_blob_urls:bool- Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks.
presign:bool- Optional, true by default. Specify whether or not to create presigned URLs.
presign_ttl:int- Optional, 1 by default. Specify how long to keep presigned URLs active.
title:string- Optional, specify a title for your S3 import storage that appears in Label Studio.
description:string- Optional, specify a description for your S3 import storage.
region_name:string- Optional, specify the AWS region of your S3 bucket.
s3_endpoint:string- Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method.
recursive_scan:bool- Optional, specify whether to perform recursive scan over the bucket content.
aws_sse_kms_key_id:string- Optional, specify an AWS SSE KMS Key ID for server-side encryption.
synchronizable, last_sync, last_sync_count, last_sync_job, status, traceback, meta: 用于同步详情和存储状态的参数。
返回
dict:- containing the response from the API including storage ID and type, among other details.
source code 浏览Git
def connect_s3s_iam_import_storage( self, role_arn: str, external_id: Optional[str] = None, bucket: Optional[str] = None, prefix: Optional[str] = None, regex_filter: Optional[str] = None, use_blob_urls: Optional[bool] = True, presign: Optional[bool] = True, presign_ttl: Optional[int] = 1, title: Optional[str] = "", description: Optional[str] = "", region_name: Optional[str] = None, s3_endpoint: Optional[str] = None, recursive_scan: Optional[bool] = False, aws_sse_kms_key_id: Optional[str] = None, ): """Create S3 secured import storage with IAM role access. Enterprise only. Parameters ---------- role_arn: string Required, specify the AWS Role ARN to assume. external_id: string or None Optional, specify the external ID to use to assume the role. If None, SDK will call api/organizations/<id> and use external_id from the response. You can find this ID on the organization page in the Label Studio UI. bucket: string Specify the name of the S3 bucket. prefix: string Optional, specify the prefix within the S3 bucket to import your data from. regex_filter: string Optional, specify a regex filter to use to match the file types of your data. use_blob_urls: bool Optional, true by default. Specify whether your data is raw image or video data, or JSON tasks. presign: bool Optional, true by default. Specify whether or not to create presigned URLs. presign_ttl: int Optional, 1 by default. Specify how long to keep presigned URLs active. title: string Optional, specify a title for your S3 import storage that appears in Label Studio. description: string Optional, specify a description for your S3 import storage. region_name: string Optional, specify the AWS region of your S3 bucket. s3_endpoint: string Optional, specify an S3 endpoint URL to use to access your bucket instead of the standard access method. recursive_scan: bool Optional, specify whether to perform recursive scan over the bucket content. aws_sse_kms_key_id: string Optional, specify an AWS SSE KMS Key ID for server-side encryption. synchronizable, last_sync, last_sync_count, last_sync_job, status, traceback, meta: Parameters for synchronization details and storage status. Returns ------- dict: containing the response from the API including storage ID and type, among other details. """ if external_id is None: organization = self.get_organization() external_id = organization["external_id"] payload = { "bucket": bucket, "prefix": prefix, "regex_filter": regex_filter, "use_blob_urls": use_blob_urls, "presign": presign, "presign_ttl": presign_ttl, "title": title, "description": description, "recursive_scan": recursive_scan, "role_arn": role_arn, "region_name": region_name, "s3_endpoint": s3_endpoint, "aws_sse_kms_key_id": aws_sse_kms_key_id, "project": self.id, "external_id": external_id, } response = self.make_request("POST", "/api/storages/s3s/", json=payload) return response.json() def create_annotation(self, task_id: int, **kwargs) ‑> Dict-
像标注员一样为任务添加注释。
参数
task_id:int- Task ID you want to update
kwargs:kwargs parameters- List of parameters to create. Check all available parameters here.
Labeling is stored in the
resultfield as a list of dicionaries, [{…}, {…}, …]
返回
dict:- Dict with created annotation
source code 浏览Git
def create_annotation(self, task_id: int, **kwargs) -> Dict: """Add annotations to a task like an annotator does. Parameters ---------- task_id: int Task ID you want to update kwargs: kwargs parameters List of parameters to create. Check all available parameters [here](https://labelstud.io/api#operation/api_tasks_annotations_create). Labeling is stored in the `result` field as a list of dicionaries, [{...}, {...}, ...] Returns ------- dict: Dict with created annotation """ response = self.make_request( "POST", f"/api/tasks/{task_id}/annotations/", json=kwargs ) response.raise_for_status() return response.json() def create_annotations_from_predictions(self, model_versions=None)-
从特定机器学习模型版本的项目任务中所有现有预测创建标注。
参数
model_versions:listorNone- Convert predictions with these model versions to annotations. If
None, all existing model versions are used
返回
dict- Dict with counter of created predictions
source code 浏览Git
def create_annotations_from_predictions(self, model_versions=None): """Create annotations from all predictions that exist for project tasks from specific ML model versions. Parameters ---------- model_versions: list or None Convert predictions with these model versions to annotations. If `None`, all existing model versions are used Returns ------- dict Dict with counter of created predictions """ payload = { "filters": {"conjunction": "and", "items": []}, "model_version": model_versions, "ordering": [], "project": self.id, "selectedItems": {"all": True, "excluded": []}, } response = self.make_request( "POST", "/api/dm/actions", params={"id": "predictions_to_annotations", "project": self.id}, json=payload, ) return response.json() def create_prediction(self, task_id: int, result: Union[List[Dict], Dict, str, ForwardRef(None)] = None, score: Optional[float] = 0, model_version: Optional[str] = None)-
为特定任务创建预测。
参数
task_id:int- Task ID
result:listordictorstr-
结果以 Label Studio JSON格式作为标注。 对于标注配置:
<View> <Image name="image" value="$value"/> <Choices name="class_name" toName="image"> <Choice value="Class A"/> <Choice value="Class B"/> </Choices> </View>以下输入是等效的,结果可能是完整的
"predictions":[{ "from_name": "class_name", "to_name": "image", "type": "choices", "value": { "choices": ["Class A"] } }]或者只是
"value"负载{"choices": ["Class A"]}或者仅类名:
"Class A" score:float- Model prediction score
model_version:str- Any string identifying your model
source code 浏览Git
def create_prediction( self, task_id: int, result: Optional[Union[List[Dict], Dict, str]] = None, score: Optional[float] = 0, model_version: Optional[str] = None, ): """Create a prediction for a specific task. Parameters ---------- task_id: int Task ID result: list or dict or str Result in the <a href="https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks"> Label Studio JSON format as for annotations</a>. For the labeling config: <View> <Image name="image" value="$value"/> <Choices name="class_name" toName="image"> <Choice value="Class A"/> <Choice value="Class B"/> </Choices> </View> The following inputs are equivalent, result could be either full `"predictions"`: [{ "from_name": "class_name", "to_name": "image", "type": "choices", "value": { "choices": ["Class A"] } }] or just `"value"` payload {"choices": ["Class A"]} or just the class name: "Class A" score: float Model prediction score model_version: str Any string identifying your model """ data = {"task": task_id, "result": result, "score": score} if model_version is not None: data["model_version"] = model_version response = self.make_request("POST", "/api/predictions", json=data) json = response.json() logger.debug(f"Response: {json}") return json def create_predictions(self, predictions)-
批量创建任务的预测结果。查看关于预标注任务的更多详情。
参数
predictions:listofdicts- List of dicts with predictions in the Label Studio JSON格式用于标注.
source code 浏览Git
def create_predictions(self, predictions): """Bulk create predictions for tasks. See <a href="https://labelstud.io/guide/predictions.html">more details about pre-annotated tasks</a>. Parameters ---------- predictions: list of dicts List of dicts with predictions in the <a href="https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks"> Label Studio JSON format as for annotations</a>. """ response = self.make_request( "POST", f"/api/projects/{self.id}/import/predictions", json=predictions ) return response.json() def create_view(self, filters, ordering=None, title='Tasks')-
创建视图
参数
filters:dict- Specify the filters(
Filters) of the view ordering:listofColumn- List with one string representing Data Manager ordering.
Use
Columnhelper class. Example:[Column.total_annotations],['-' + Column.total_annotations]- inverted order title:str- Tab name
返回
dict:- dict with created view
source code 浏览Git
def create_view(self, filters, ordering=None, title="Tasks"): """Create view Parameters ---------- filters: dict Specify the filters(`label_studio_sdk.data_manager.Filters`) of the view ordering: list of label_studio_sdk.data_manager.Column List with <b>one</b> string representing Data Manager ordering. Use `label_studio_sdk.data_manager.Column` helper class. Example: ```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order title: str Tab name Returns ------- dict: dict with created view """ data = { "project": self.id, "data": {"title": title, "ordering": ordering, "filters": filters}, } response = self.make_request("POST", "/api/dm/views", json=data) return response.json() def delete_all_tasks(self, excluded_ids: list = None) ‑> requests.models.Response-
从项目中删除所有任务。
参数
excluded_ids:listofint- Task ids that should be excluded from the deletion.
source code 浏览Git
def delete_all_tasks(self, excluded_ids: list = None) -> Response: """Delete all tasks from the project. Parameters ---------- excluded_ids: list of int Task ids that should be excluded from the deletion. """ assert ( isinstance(excluded_ids, list) or excluded_ids is None ), "excluded_ids should be list of int or None" if excluded_ids is None: excluded_ids = [] payload = { "selectedItems": {"all": True, "excluded": excluded_ids}, "project": self.id, } return self.make_request( "POST", f"/api/dm/actions?project={self.id}&id=delete_tasks", json=payload ) def delete_annotation(self, annotation_id: int) ‑> int-
使用标注ID删除一个标注。此操作无法撤销!
参数
annotation_id:int- A unique integer value identifying this annotation.
返回
int- Status code for operation
source code 浏览Git
def delete_annotation(self, annotation_id: int) -> int: """Delete an annotation using the annotation ID. This action can't be undone! Parameters ---------- annotation_id: int A unique integer value identifying this annotation. Returns ---------- int Status code for operation """ response = self.make_request("DELETE", f"/api/annotations/{annotation_id}") response.raise_for_status() return response.status_code def delete_annotators_assignment(self, tasks_ids)-
移除任务的所有已分配标注人员
参数
tasks_ids:listofint
返回
dict- Dict with counter of deleted annotator assignments
source code 浏览Git
def delete_annotators_assignment(self, tasks_ids): """Remove all assigned annotators for tasks Parameters ---------- tasks_ids: list of int Returns ------- dict Dict with counter of deleted annotator assignments """ payload = {"selectedItems": {"all": False, "included": tasks_ids}} response = self.make_request( "POST", f"/api/dm/actions?id=delete_annotators&project={self.id}", json=payload, ) return response.json() def delete_reviewers_assignment(self, tasks_ids)-
清除所有任务分配的审核人员
参数
tasks_ids:listofint
返回
dict- Dict with counter of deleted reviewer assignments
source code 浏览Git
def delete_reviewers_assignment(self, tasks_ids): """Clear all assigned reviewers for tasks Parameters ---------- tasks_ids: list of int Returns ------- dict Dict with counter of deleted reviewer assignments """ payload = {"selectedItems": {"all": False, "included": tasks_ids}} response = self.make_request( "POST", f"/api/dm/actions?id=delete_reviewers&project={self.id}", json=payload, ) return response.json() def delete_task(self, task_id: int) ‑> requests.models.Response-
删除一个任务。如需删除多个任务,
请使用 delete_tasks()。参数
task_id:int- Task id.
source code 浏览Git
def delete_task(self, task_id: int) -> Response: """Delete a task. To remove multiple tasks `use delete_tasks()`. Parameters ---------- task_id: int Task id. """ assert isinstance(task_id, int), "task_id should be int" return self.make_request("DELETE", f"/api/tasks/{task_id}") def delete_tasks(self, task_ids: list) ‑> requests.models.Response-
根据ID批量删除任务。
参数
task_ids:listofint- Task ids.
source code 浏览Git
def delete_tasks(self, task_ids: list) -> Response: """Delete multiple tasks by IDs. Parameters ---------- task_ids: list of int Task ids. """ assert isinstance(task_ids, list), "task_ids should be list of int" if not task_ids: # avoid deletion of all tasks when task_ids = [] return Response() payload = { "selectedItems": {"all": False, "included": task_ids}, "project": self.id, } return self.make_request( "POST", f"/api/dm/actions?project={self.id}&id=delete_tasks", json=payload ) def delete_view(self, view_id)-
删除视图
参数
view_id:int- View ID
返回
dict:- dict with deleted view
source code 浏览Git
def delete_view(self, view_id): """Delete view Parameters ---------- view_id: int View ID Returns ------- dict: dict with deleted view """ response = self.make_request("DELETE", f"/api/dm/views/{view_id}") return def export(self, filters=None, title='SDK Export', export_type='JSON', output_dir='.', **kwargs)-
从项目中导出任务(可选筛选条件),并将导出的数据保存到指定目录。
该方法: (1) 如果指定过滤器不为None,则使用这些过滤器创建一个临时视图, (2) 使用视图ID创建一个新的导出快照, (3) 在快照创建过程中检查其状态, (4) 并以指定的导出格式下载快照文件。 (5) 导出完成后,会清理并移除临时视图。
参数
filters:data_manager.Filters, dict, optional- Filters to apply when exporting tasks. If provided, a temporary view is created with these filters. The format of the filters should match the Label Studio filter options. Default is None, which means all tasks are exported. Use label_studio_sdk.data_manager.Filters.create() to create filters, Example of the filters JSON format:
{ "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] }titile:str, optional- The title of the export snapshot. Default is 'SDK Export'.
export_type:str, optional- The format of the exported data. It should be one of the formats supported by Label Studio ('JSON', 'CSV', etc.). Default is 'JSON'.
output_dir:str, optional- The directory where the exported file will be saved. Default is the current directory.
kwargs:kwargs, optional- The same parameters as in the export_snapshot_create method.
返回
dict- containing the status of the export, the filename of the exported file, and the export ID.
filename:str- Path to the downloaded export file
status:int- 200 is ok
export_id:int- Export ID, you can retrieve more details about this export using this ID
source code 浏览Git
def export( self, filters=None, title="SDK Export", export_type="JSON", output_dir=".", **kwargs, ): """ Export tasks from the project with optional filters, and save the exported data to a specified directory. This method: (1) creates a temporary view with the specified filters if they are not None, (2) creates a new export snapshot using the view ID, (3) checks the status of the snapshot creation while it's in progress, (4) and downloads the snapshot file in the specified export format. (5) After the export, it cleans up and remove the temporary view. Parameters ---------- filters : data_manager.Filters, dict, optional Filters to apply when exporting tasks. If provided, a temporary view is created with these filters. The format of the filters should match the Label Studio filter options. Default is None, which means all tasks are exported. Use label_studio_sdk.data_manager.Filters.create() to create filters, Example of the filters JSON format: ```json { "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] } ``` titile : str, optional The title of the export snapshot. Default is 'SDK Export'. export_type : str, optional The format of the exported data. It should be one of the formats supported by Label Studio ('JSON', 'CSV', etc.). Default is 'JSON'. output_dir : str, optional The directory where the exported file will be saved. Default is the current directory. kwargs : kwargs, optional The same parameters as in the export_snapshot_create method. Returns ------- dict containing the status of the export, the filename of the exported file, and the export ID. filename : str Path to the downloaded export file status : int 200 is ok export_id : int Export ID, you can retrieve more details about this export using this ID """ # Create a temporary view with the specified filters if filters: view = self.create_view(title="Temp SDK export", filters=filters) task_filter_options = {"view": view["id"]} else: task_filter_options = None view = None # Create a new export snapshot using the view ID export_result = self.export_snapshot_create( title=title, task_filter_options=task_filter_options, **kwargs, ) # Check the status of the snapshot creation export_id = export_result["id"] while self.export_snapshot_status(export_id).is_in_progress(): time.sleep(1.0) # Wait until the snapshot is ready os.makedirs(output_dir, exist_ok=True) # Download the snapshot file once it's ready status, filename = self.export_snapshot_download( export_id, export_type=export_type, path=output_dir ) # Clean up the view if view: self.delete_view(view["id"]) return {"status": status, "filename": filename, "export_id": export_id} def export_snapshot_create(self, title: str, task_filter_options: dict = None, serialization_options_drafts: bool = True, serialization_options_predictions: bool = True, serialization_options_annotations__completed_by: bool = True, annotation_filter_options_usual: bool = True, annotation_filter_options_ground_truth: bool = True, annotation_filter_options_skipped: bool = True, interpolate_key_frames: bool = False) ‑> dict-
创建新的导出快照
参数
title:str- Export title
task_filter_options:dict- Task filter options, use {"view": tab_id} to apply filter from this tab, 查看API参数获取更多详情
serialization_options_drafts:bool- Expand drafts (False) or include only ID (True)
serialization_options_predictions:bool- Expand predictions (False) or include only ID (True)
serialization_options_annotations__completed_by:bool- Expand user that completed_by (False) or include only ID (True)
annotation_filter_options_usual:bool- Include not cancelled and not ground truth annotations
annotation_filter_options_ground_truth:bool- Filter ground truth annotations
annotation_filter_options_skipped:bool- Filter skipped annotations
interpolate_key_frames:bool- Interpolate key frames into sequence
返回
dict:- containing the same fields as in the request and the created export fields:
id:int- Export ID
created_at:str- Creation time
status:str- Export status
created_by:dict- User data
finished_at:str- Finished time
source code 浏览Git
def export_snapshot_create( self, title: str, task_filter_options: dict = None, serialization_options_drafts: bool = True, serialization_options_predictions: bool = True, serialization_options_annotations__completed_by: bool = True, annotation_filter_options_usual: bool = True, annotation_filter_options_ground_truth: bool = True, annotation_filter_options_skipped: bool = True, interpolate_key_frames: bool = False, ) -> dict: """ Create new export snapshot ---------- Parameters ---------- title: str Export title task_filter_options: dict Task filter options, use {"view": tab_id} to apply filter from this tab, <a href="https://api.labelstud.io/#operation/api_projects_exports_create">check the API parameters for more details</a> serialization_options_drafts: bool Expand drafts (False) or include only ID (True) serialization_options_predictions: bool Expand predictions (False) or include only ID (True) serialization_options_annotations__completed_by: bool Expand user that completed_by (False) or include only ID (True) annotation_filter_options_usual: bool Include not cancelled and not ground truth annotations annotation_filter_options_ground_truth: bool Filter ground truth annotations annotation_filter_options_skipped: bool Filter skipped annotations interpolate_key_frames: bool Interpolate key frames into sequence Returns ------- dict: containing the same fields as in the request and the created export fields: id: int Export ID created_at: str Creation time status: str Export status created_by: dict User data finished_at: str Finished time """ if task_filter_options is None: task_filter_options = {} payload = { "title": title, "serialization_options": { "drafts": {"only_id": serialization_options_drafts}, "predictions": {"only_id": serialization_options_predictions}, "annotations__completed_by": { "only_id": serialization_options_annotations__completed_by }, "interpolate_key_frames": interpolate_key_frames, }, "task_filter_options": task_filter_options, "annotation_filter_options": { "usual": annotation_filter_options_usual, "ground_truth": annotation_filter_options_ground_truth, "skipped": annotation_filter_options_skipped, }, } response = self.make_request( "POST", f"/api/projects/{self.id}/exports?interpolate_key_frames={interpolate_key_frames}", json=payload, ) return response.json() def export_snapshot_delete(self, export_id: int) ‑> int-
通过指定的导出ID删除导出文件
参数
export_id:int- Existing Export ID from current project
返回
Status code for operation
source code 浏览Git
def export_snapshot_delete(self, export_id: int) -> int: """Delete an export file by specified export ID Parameters ---------- export_id: int Existing Export ID from current project Returns ---------- Status code for operation """ response = self.make_request( "DELETE", f"/api/projects/{self.id}/exports/{export_id}" ) return response.status_code def export_snapshot_download(self, export_id: int, export_type: str = 'JSON', path: str = '.') ‑> (, ) -
以指定格式下载导出快照文件
参数
export_id:int- Existing Export ID from current project. Can be referred as id from self.exports()
export_type:str- Default export_type is JSON. Specify another format type as referenced in Label Studio转换器代码.
path:str- Default path to store downloaded files
返回
Status code for operation and downloaded filename
source code 浏览Git
def export_snapshot_download( self, export_id: int, export_type: str = "JSON", path: str = "." ) -> (int, str): """ Download file with export snapshot in provided format ---------- Parameters ---------- export_id: int Existing Export ID from current project. Can be referred as id from self.exports() export_type: str Default export_type is JSON. Specify another format type as referenced in <a href="https://github.com/heartexlabs/label-studio-converter/blob/master/label_studio_converter/converter.py#L32"> the Label Studio converter code</a>. path: str Default path to store downloaded files Returns ------- Status code for operation and downloaded filename """ response = self.make_request( "GET", f"/api/projects/{self.id}/exports/{export_id}/download?exportType={export_type}", ) filename = None if response.status_code == 200: content_disposition = response.headers.get("Content-Disposition") if content_disposition: filename = content_disposition.split("filename=")[-1].strip("\"'") filename = os.path.basename(filename) else: raise LabelStudioException("No filename in response") with open(os.path.join(path, filename), "wb") as f: for chk in response: f.write(chk) return response.status_code, filename def export_snapshot_list(self) ‑> list-
获取当前项目的导出快照列表
返回
list[dict]- List of dict with export snapshots with status:
id:int- Export ID
created_at:str- Creation time
status:str- Export status
created_by:dict- User data
finished_at:str- Finished time
source code 浏览Git
def export_snapshot_list(self) -> list: """ Get list of export snapshots for the current project ------- Returns ------- list[dict] List of dict with export snapshots with status: id: int Export ID created_at: str Creation time status: str Export status created_by: dict User data finished_at: str Finished time """ response = self.make_request("GET", f"/api/projects/{self.id}/exports") return response.json() def export_snapshot_status(self, export_id: int) ‑> ExportSnapshotStatus-
根据导出ID获取导出快照状态
参数
export_id:int- Existing Export ID from current project. Can be referred as id from self.exports()
返回
ExportSnapshotStatus.response is dict and contains the following fields:id:int- Export ID
created_at:str- Creation time
status:str- created, completed, in_progress, failed
created_by:dict- User data
finished_at:str- Finished time
source code 浏览Git
def export_snapshot_status(self, export_id: int) -> ExportSnapshotStatus: """ Get export snapshot status by Export ID ---------- Parameters ---------- export_id: int Existing Export ID from current project. Can be referred as id from self.exports() Returns ------- `label_studio_sdk.project.ExportSnapshotStatus` ExportSnapshotStatus.response is dict and contains the following fields: id: int Export ID created_at: str Creation time status: str created, completed, in_progress, failed created_by: dict User data finished_at: str Finished time """ response = self.make_request( "GET", f"/api/projects/{self.id}/exports/{export_id}" ) return ExportSnapshotStatus(response.json()) def export_tasks(self, export_type: str = 'JSON', download_all_tasks: bool = False, download_resources: bool = False, ids: Optional[List[int]] = None, export_location: Optional[str] = None) ‑> Union[list, pathlib.Path]-
导出已标注的任务。
参数
export_type:string- Default export_type is JSON. Specify another format type as referenced in Label Studio转换器代码.
download_all_tasks:bool- Default download_all_tasks is False. If true, download all tasks regardless of status. If false, download only annotated tasks.
download_resources:bool- Default download_resources is False. If true, download all resource files such as images, audio, and others relevant to the tasks.
ids:listofints- Optional, specify a list of task IDs to retrieve only the details for those tasks.
export_location:strorpath- Optional, specify a location to save the export to, this is mandatory for the YOLO export. A pathlib.Path object will be returned instead of the deserialized json.
返回
listofdicts if export_location is None- Tasks with annotations
pathlib.Path if export_location is not None- Path to the export
source code 浏览Git
def export_tasks( self, export_type: str = "JSON", download_all_tasks: bool = False, download_resources: bool = False, ids: Optional[List[int]] = None, export_location: Optional[str] = None, ) -> Union[list, pathlib.Path]: """Export annotated tasks. Parameters ---------- export_type: string Default export_type is JSON. Specify another format type as referenced in <a href="https://github.com/heartexlabs/label-studio-converter/blob/master/label_studio_converter/converter.py#L32"> the Label Studio converter code</a>. download_all_tasks: bool Default download_all_tasks is False. If true, download all tasks regardless of status. If false, download only annotated tasks. download_resources: bool Default download_resources is False. If true, download all resource files such as images, audio, and others relevant to the tasks. ids: list of ints Optional, specify a list of task IDs to retrieve only the details for those tasks. export_location: str or path Optional, specify a location to save the export to, this is mandatory for the YOLO export. A pathlib.Path object will be returned instead of the deserialized json. Returns ------- list of dicts if export_location is None Tasks with annotations pathlib.Path if export_location is not None Path to the export """ params = { "exportType": export_type, "download_all_tasks": download_all_tasks, "download_resources": download_resources, } if ids: params["ids"] = ids response = self.make_request( method="GET", url=f"/api/projects/{self.id}/export", params=params ) if export_location is None: if "JSON" not in export_type.upper(): raise ValueError( f"{export_type} export type requires an export location to be specified" ) return response.json() export_path = pathlib.Path(export_location) # ensure that parent location exists even if it is in some subdirectory export_path.parent.mkdir(parents=True, exist_ok=True) with open(export_path, "wb") as out_file: for chunk in response.iter_content( chunk_size=1024 ): # 1 kib seems reasonable out_file.write(chunk) return export_path def get_annotation(self, annotation_id: int) ‑> dict-
使用标注ID检索任务的特定标注。
参数
annotation_id:int- A unique integer value identifying this annotation.
返回
dict- Retreived annotation object
source code 浏览Git
def get_annotation(self, annotation_id: int) -> dict: """Retrieve a specific annotation for a task using the annotation ID. Parameters ---------- annotation_id: int A unique integer value identifying this annotation. Returns ---------- dict Retreived annotation object """ response = self.make_request("GET", f"/api/annotations/{annotation_id}") response.raise_for_status() return response.json() def get_export_storages(self)-
获取导出(目标)云存储。
返回
listofdicts:- List of dicts with target storages
Each dict consistsofthese fields:id:int- A unique integer value identifying this storage.
type:str- The type of the storage. Default is "s3".
synchronizable:bool- Indicates if the storage is synchronizable. Default is True.
last_sync:strorNone- The last sync finished time. Can be None.
last_sync_count:intorNone- The count of tasks synced last time. Can be None.
last_sync_job:strorNone- The last sync job ID. Can be None.
status:str- The status of the storage. Can be one of "initialized", "queued", "in_progress", "failed", "completed".
traceback:strorNone- The traceback report for the last failed sync. Can be None.
meta:dictorNone- Meta and debug information about storage processes. Can be None.
title:strorNone- The title of the cloud storage. Can be None.
description:strorNone- The description of the cloud storage. Can be None.
created_at:str- The creation time of the storage.
can_delete_objects:boolorNone- Deletion from storage enabled. Can be None.
bucket:strorNone- The S3 bucket name. Can be None.
prefix:strorNone- The S3 bucket prefix. Can be None.
regex_filter:strorNone- The cloud storage regex for filtering objects. Can be None.
use_blob_urls:bool- Indicates if objects are interpreted as BLOBs and generate URLs.
aws_access_key_id:strorNone- The AWS_ACCESS_KEY_ID. Can be None.
aws_secret_access_key:strorNone- The AWS_SECRET_ACCESS_KEY. Can be None.
aws_session_token:strorNone- The AWS_SESSION_TOKEN. Can be None.
aws_sse_kms_key_id:strorNone- The AWS SSE KMS Key ID. Can be None.
region_name:strorNone- The AWS Region. Can be None.
s3_endpoint:strorNone- The S3 Endpoint. Can be None.
project:int- A unique integer value identifying this project.
source code 浏览Git
def get_export_storages(self): """Get Export (Target) Cloud Storage. Returns ------- list of dicts: List of dicts with target storages ------- Each dict consists of these fields: id : int A unique integer value identifying this storage. type : str The type of the storage. Default is "s3". synchronizable : bool Indicates if the storage is synchronizable. Default is True. last_sync : str or None The last sync finished time. Can be None. last_sync_count : int or None The count of tasks synced last time. Can be None. last_sync_job : str or None The last sync job ID. Can be None. status : str The status of the storage. Can be one of "initialized", "queued", "in_progress", "failed", "completed". traceback : str or None The traceback report for the last failed sync. Can be None. meta : dict or None Meta and debug information about storage processes. Can be None. title : str or None The title of the cloud storage. Can be None. description : str or None The description of the cloud storage. Can be None. created_at : str The creation time of the storage. can_delete_objects : bool or None Deletion from storage enabled. Can be None. bucket : str or None The S3 bucket name. Can be None. prefix : str or None The S3 bucket prefix. Can be None. regex_filter : str or None The cloud storage regex for filtering objects. Can be None. use_blob_urls : bool Indicates if objects are interpreted as BLOBs and generate URLs. aws_access_key_id : str or None The AWS_ACCESS_KEY_ID. Can be None. aws_secret_access_key : str or None The AWS_SECRET_ACCESS_KEY. Can be None. aws_session_token : str or None The AWS_SESSION_TOKEN. Can be None. aws_sse_kms_key_id : str or None The AWS SSE KMS Key ID. Can be None. region_name : str or None The AWS Region. Can be None. s3_endpoint : str or None The S3 Endpoint. Can be None. project : int A unique integer value identifying this project. """ response = self.make_request("GET", f"/api/storages/export?project={self.id}") return response.json() def get_files_from_tasks(self, tasks: Dict, get_tasks: bool = False)-
将文件从任务复制到缓存文件夹
参数
tasks:Dict- Tasks to download to local storage
get_tasks:bool
从当前项目中获取所有任务
返回
list- List of filenames
source code 浏览Git
def get_files_from_tasks(self, tasks: Dict, get_tasks: bool = False): """Copy files from tasks to cache folder Parameters ---------- tasks: Dict Tasks to download to local storage get_tasks: bool Get all tasks from current project Returns ------- list List of filenames """ if get_tasks: tasks = self.get_tasks() filenames = [] if tasks: for task in tasks: for key in task["data"]: try: filename = get_local_path( task["data"][key], access_token=self.api_key, hostname=self.url, ) filenames.append(filename) except (FileNotFoundError, InvalidSchema, MissingSchema, IOError): logger.debug(f"Couldn't copy file {task['data'][key]}.") return filenames def get_import_storages(self)-
获取导入(源)云存储。
返回
listofdicts:- List of dicts with source storages, each dict consists of these fields:
Each dict consistsofthese fields:id:int- A unique integer value identifying this storage.
type:str- The type of the storage. Default is "s3".
synchronizable:bool- Indicates if the storage is synchronizable. Default is True.
presign:bool- Indicates if the storage is presign. Default is True.
last_sync:strorNone- The last sync finished time. Can be None.
last_sync_count:intorNone- The count of tasks synced last time. Can be None.
last_sync_job:strorNone- The last sync job ID. Can be None.
status:str- The status of the storage. Can be one of "initialized", "queued", "in_progress", "failed", "completed".
traceback:strorNone- The traceback report for the last failed sync. Can be None.
meta:dictorNone- Meta and debug information about storage processes. Can be None.
title:strorNone- The title of the cloud storage. Can be None.
description:strorNone- The description of the cloud storage. Can be None.
created_at:str- The creation time of the storage.
bucket:strorNone- The S3 bucket name. Can be None.
prefix:strorNone- The S3 bucket prefix. Can be None.
regex_filter:strorNone- The cloud storage regex for filtering objects. Can be None.
use_blob_urls:bool- Indicates if objects are interpreted as BLOBs and generate URLs.
aws_access_key_id:strorNone- The AWS_ACCESS_KEY_ID. Can be None.
aws_secret_access_key:strorNone- The AWS_SECRET_ACCESS_KEY. Can be None.
aws_session_token:strorNone- The AWS_SESSION_TOKEN. Can be None.
aws_sse_kms_key_id:strorNone- The AWS SSE KMS Key ID. Can be None.
region_name:strorNone- The AWS Region. Can be None.
s3_endpoint:strorNone- The S3 Endpoint. Can be None.
presign_ttl:int- The presigned URLs TTL (in minutes).
recursive_scan:bool- Indicates if a recursive scan over the bucket content is performed.
glob_pattern:strorNone- The glob pattern for syncing from bucket. Can be None.
synced:bool- Flag indicating if the dataset has been previously synced or not.
source code 浏览Git
def get_import_storages(self): """Get Import (Source) Cloud Storage. Returns ------- list of dicts: List of dicts with source storages, each dict consists of these fields: ------- Each dict consists of these fields: id : int A unique integer value identifying this storage. type : str The type of the storage. Default is "s3". synchronizable : bool Indicates if the storage is synchronizable. Default is True. presign : bool Indicates if the storage is presign. Default is True. last_sync : str or None The last sync finished time. Can be None. last_sync_count : int or None The count of tasks synced last time. Can be None. last_sync_job : str or None The last sync job ID. Can be None. status : str The status of the storage. Can be one of "initialized", "queued", "in_progress", "failed", "completed". traceback : str or None The traceback report for the last failed sync. Can be None. meta : dict or None Meta and debug information about storage processes. Can be None. title : str or None The title of the cloud storage. Can be None. description : str or None The description of the cloud storage. Can be None. created_at : str The creation time of the storage. bucket : str or None The S3 bucket name. Can be None. prefix : str or None The S3 bucket prefix. Can be None. regex_filter : str or None The cloud storage regex for filtering objects. Can be None. use_blob_urls : bool Indicates if objects are interpreted as BLOBs and generate URLs. aws_access_key_id : str or None The AWS_ACCESS_KEY_ID. Can be None. aws_secret_access_key : str or None The AWS_SECRET_ACCESS_KEY. Can be None. aws_session_token : str or None The AWS_SESSION_TOKEN. Can be None. aws_sse_kms_key_id : str or None The AWS SSE KMS Key ID. Can be None. region_name : str or None The AWS Region. Can be None. s3_endpoint : str or None The S3 Endpoint. Can be None. presign_ttl : int The presigned URLs TTL (in minutes). recursive_scan : bool Indicates if a recursive scan over the bucket content is performed. glob_pattern : str or None The glob pattern for syncing from bucket. Can be None. synced : bool Flag indicating if the dataset has been previously synced or not. """ response = self.make_request("GET", f"/api/storages/?project={self.id}") return response.json() def get_labeled_tasks(self, only_ids=False)-
检索所有已完成的任务,即已创建所需数量的标注
参数
only_ids:bool- Return only task IDs.
返回
list- List of task dicts, the same as in
get_tasks.
source code 浏览Git
def get_labeled_tasks(self, only_ids=False): """Retrieve all tasks that have been completed, i.e. where requested number of annotations have been created Parameters ---------- only_ids: bool Return only task IDs. Returns ------- list List of task dicts, the same as in `get_tasks`. """ return self.get_tasks( filters={ "conjunction": "and", "items": [ { "filter": "filter:tasks:completed_at", "operator": "empty", "value": False, "type": "Datetime", } ], }, only_ids=only_ids, ) def get_labeled_tasks_ids(self)-
获取所有已完成任务的任务ID,即已创建所需数量标注的任务
返回
list- List of task IDs
source code 浏览Git
def get_labeled_tasks_ids(self): """Retrieve all task IDs for completed tasks, i.e. where requested number of annotations have been created Returns ------- list List of task IDs """ return self.get_labeled_tasks(only_ids=True) def get_members(self)-
source code 浏览Git
def get_members(self): """Get members from this project. Parameters ---------- Returns ------- list of `label_studio_sdk.users.User` """ from .users import User assert self.is_enterprise, ( "Project members are available in the Enterprise edition of Label Studio only. " "Use get_users() instead." ) response = self.make_request("GET", f"/api/projects/{self.id}/members") users = [] for user_data in response.json(): user_data["client"] = self users.append(User(**user_data)) return users def get_model_versions(self)-
从预标注或已连接的机器学习后端获取可用的ML模型版本列表。
返回
listofstrings- Model versions
source code 浏览Git
def get_model_versions(self): """Get the list of available ML model versions from pre-annotations or connected ML backends. Returns ------- list of strings Model versions """ response = self.make_request("GET", f"/api/projects/{self.id}/model-versions") return response.json() def get_paginated_tasks(self, filters=None, ordering=None, view_id=None, selected_ids=None, page: int = 1, page_size: int = 100, only_ids: bool = False, resolve_uri: bool = True)-
根据筛选条件、排序机制或预定义的视图ID,从数据管理器中检索任务子集。对于不存在的页面,返回404错误。
参数
filters:Filters.create()- JSON objects representing Data Manager filters. Use
Filters.create()helper to create it. Example:{ "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] } ordering:listofColumn- List with one string representing Data Manager ordering.
Use
Columnhelper class. Example:[Column.total_annotations],['-' + Column.total_annotations]- inverted order view_id:int- View ID, visible as a Data Manager tab, for which to retrieve filters, ordering, and selected items
selected_ids:listofints- Task IDs
page:int- Page. Default is 1.
page_size:int- Page size. Default is 100, to retrieve all tasks in the project you can use get_tasks().
only_ids:bool- If true, return only task IDs
resolve_uri:bool- Resolve pre-sign urls to https links
返回
dict- Example:
{ "tasks": [{...}], "total_annotations": 50, "total_predictions": 100, "total": 100 } tasks:listofdicts- Tasks with task data, annotations, predictions and other fields from the Data Manager
total:int- Total number of tasks in filtered result
total_annotations:int- Total number of annotations in filtered tasks
total_predictions:int- Total number of predictions in filtered tasks
source code 浏览Git
def get_paginated_tasks( self, filters=None, ordering=None, view_id=None, selected_ids=None, page: int = 1, page_size: int = 100, only_ids: bool = False, resolve_uri: bool = True, ): """Retrieve a subset of tasks from the Data Manager based on a filter, ordering mechanism, or a predefined view ID. For non-existent pages it returns 404 error. Parameters ---------- filters: label_studio_sdk.data_manager.Filters.create() JSON objects representing Data Manager filters. Use `label_studio_sdk.data_manager.Filters.create()` helper to create it. Example: { "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] } ordering: list of label_studio_sdk.data_manager.Column List with <b>one</b> string representing Data Manager ordering. Use `label_studio_sdk.data_manager.Column` helper class. Example: ```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order view_id: int View ID, visible as a Data Manager tab, for which to retrieve filters, ordering, and selected items selected_ids: list of ints Task IDs page: int Page. Default is 1. page_size: int Page size. Default is 100, to retrieve all tasks in the project you can use get_tasks(). only_ids: bool If true, return only task IDs resolve_uri: bool Resolve pre-sign urls to https links Returns ------- dict Example: { "tasks": [{...}], "total_annotations": 50, "total_predictions": 100, "total": 100 } tasks: list of dicts Tasks with task data, annotations, predictions and other fields from the Data Manager total: int Total number of tasks in filtered result total_annotations: int Total number of annotations in filtered tasks total_predictions: int Total number of predictions in filtered tasks """ query = { "filters": filters, "ordering": ordering or [], "selectedItems": ( {"all": False, "included": selected_ids} if selected_ids else {"all": True, "excluded": []} ), } params = { "project": self.id, "page": page, "page_size": page_size, "view": view_id, "query": json.dumps(query), "fields": "all", "resolve_uri": resolve_uri, } if only_ids: params["include"] = "id" response = self.make_request( "GET", "/api/tasks", params, raise_exceptions=False ) # we'll get 404 from API on empty page if response.status_code == 404: return {"tasks": [], "end_pagination": True} elif response.status_code != 200: self.log_response_error(response) try: response.raise_for_status() except HTTPError as e: raise LabelStudioException(f"Error loading tasks: {e}") data = response.json() tasks = data["tasks"] if only_ids: data["tasks"] = [task["id"] for task in tasks] return data def get_paginated_tasks_ids(self, *args, **kwargs)-
与
Project.get_paginated_tasks()相同,但仅返回任务ID。source code 浏览Git
def get_paginated_tasks_ids(self, *args, **kwargs): """Same as `label_studio_sdk.project.Project.get_paginated_tasks()` but returns only task IDs. """ kwargs["only_ids"] = True return self.get_paginated_tasks(*args, **kwargs) def get_params(self)-
获取所有可用的项目参数。
返回
dict- containing all following params:
title:str- Project name.
description:str- Project description
label_config:str- Label config in XML format.
expert_instruction:str- Labeling instructions in HTML format
show_instruction:bool- Whether to display instructions to annotators before they start
show_skip_button:bool- Whether to show a skip button in the Label Studio UI and let annotators skip the task
enable_empty_annotation:bool- Allow annotators to submit empty annotations
show_annotation_history:bool- Show annotation history to annotator
organization:int- Organization ID
color:str- Color to decorate the project card in the Label Studio UI
maximum_annotations:int- Maximum number of annotations for one task. If the number of annotations per task is equal or greater to this value, the task is finished and is_labeled=True is set. (Enterprise only)
is_published:bool- Whether or not the project is published to annotators (Enterprise only)
model_version:str- Machine learning model version for predictions or pre-annotations
is_draft:bool- Whether or not the project is in the middle of being created (Enterprise only)
created_by:object- Details about the user that created the project
min_annotations_to_start_training:int- Minimum number of completed tasks after which model training is started
show_collab_predictions:bool- Whether to show model predictions to the annotator, allowing them to collaborate with the ML model
sampling:str- Type of sampling to use for task labeling. Uncertainty sampling is Enterprise only. Enum: "Sequential sampling" "Uniform sampling" "Uncertainty sampling"
show_ground_truth_first:bool- Whether to show tasks with ground truth annotations first (Enterprise only)
show_overlap_first:bool- Whether to show tasks with overlap first (Enterprise only)
overlap_cohort_percentage:int- Percentage of tasks that must be annotated multiple times. (Enterprise only)
task_data_login:str- User credentials for accessing task data. (Enterprise only)
task_data_password:str- Password credentials for accessing task data. (Enterprise only)
control_weights:object- Weights for control tags used when calculating agreement metrics. (Enterprise only)
evaluate_predictions_automatically:bool- Retrieve and display predictions when loading a task
source code 浏览Git
def get_params(self): """Get all available project parameters. Returns -------- dict containing all following params: title: str Project name. description: str Project description label_config: str Label config in XML format. expert_instruction: str Labeling instructions in HTML format show_instruction: bool Whether to display instructions to annotators before they start show_skip_button: bool Whether to show a skip button in the Label Studio UI and let annotators skip the task enable_empty_annotation: bool Allow annotators to submit empty annotations show_annotation_history: bool Show annotation history to annotator organization: int Organization ID color: str Color to decorate the project card in the Label Studio UI maximum_annotations: int Maximum number of annotations for one task. If the number of annotations per task is equal or greater to this value, the task is finished and is_labeled=True is set. (Enterprise only) is_published: bool Whether or not the project is published to annotators (Enterprise only) model_version: str Machine learning model version for predictions or pre-annotations is_draft: bool Whether or not the project is in the middle of being created (Enterprise only) created_by: object Details about the user that created the project min_annotations_to_start_training: int Minimum number of completed tasks after which model training is started show_collab_predictions: bool Whether to show model predictions to the annotator, allowing them to collaborate with the ML model sampling: str Type of sampling to use for task labeling. Uncertainty sampling is Enterprise only. Enum: "Sequential sampling" "Uniform sampling" "Uncertainty sampling" show_ground_truth_first: bool Whether to show tasks with ground truth annotations first (Enterprise only) show_overlap_first: bool Whether to show tasks with overlap first (Enterprise only) overlap_cohort_percentage: int Percentage of tasks that must be annotated multiple times. (Enterprise only) task_data_login: str User credentials for accessing task data. (Enterprise only) task_data_password: str Password credentials for accessing task data. (Enterprise only) control_weights: object Weights for control tags used when calculating agreement metrics. (Enterprise only) evaluate_predictions_automatically: bool Retrieve and display predictions when loading a task """ response = self.make_request("GET", f"/api/projects/{self.id}") return response.json() def get_predictions_conflict(self)-
source code 浏览Git
def get_predictions_conflict(self): raise NotImplementedError def get_predictions_coverage(self)-
项目中所有模型版本的预测覆盖率统计。
返回
dict-
示例:
{ "2021-01-01": 0.9, "2021-02-01": 0.7 }0.9表示该项目90%的任务由模型版本"2021-01-01"的预测覆盖
source code 浏览Git
def get_predictions_coverage(self): """Prediction coverage stats for all model versions for the project. Returns ------- dict Example: { "2021-01-01": 0.9, "2021-02-01": 0.7 } `0.9` means that 90% of project tasks is covered by predictions with model_version `"2021-01-01"` """ model_versions = self.get_model_versions() params = self.get_params() tasks_number = params["task_number"] coverage = { model_version: count / tasks_number for model_version, count in model_versions.items() } return coverage def get_predictions_precision(self)-
source code 浏览Git
def get_predictions_precision(self): raise NotImplementedError def get_task(self, task_id)-
根据ID获取特定任务。
参数
task_id:int- Task ID you want to retrieve
返回
dict:- dict of task data containing all initial data and annotation results in Label Studio JSON格式
id: int Task ID predictions: dict Predictions object annotations: dict Annotations object drafts: dict Drafts object data: object User imported or uploaded data for a task. Data is formatted according to the project label config. meta: object Meta is user imported (uploaded) data and can be useful as input for an ML Backend for embeddings, advanced vectors, and other info. It is passed to ML during training/predicting steps. (Deprecated) created_at: str Date time string representing the time a task was created. updated_at: str Date time string representing the last time a task was updated. is_labeled: bool True if the number of annotations for this task is greater than or equal to the number of maximum_completions for the project. overlap: int Number of distinct annotators that processed the current task. project: int Project ID for this task file_upload: str Uploaded file used as data source for this tasksource code 浏览Git
def get_task(self, task_id): """Get specific task by ID. Parameters ---------- task_id: int Task ID you want to retrieve Returns ------- dict: dict of task data containing all initial data and annotation results in [Label Studio JSON format](https://labelstud.io/guide/tasks.html#Basic-Label-Studio-JSON-format) ``` id: int Task ID predictions: dict Predictions object annotations: dict Annotations object drafts: dict Drafts object data: object User imported or uploaded data for a task. Data is formatted according to the project label config. meta: object Meta is user imported (uploaded) data and can be useful as input for an ML Backend for embeddings, advanced vectors, and other info. It is passed to ML during training/predicting steps. (Deprecated) created_at: str Date time string representing the time a task was created. updated_at: str Date time string representing the last time a task was updated. is_labeled: bool True if the number of annotations for this task is greater than or equal to the number of maximum_completions for the project. overlap: int Number of distinct annotators that processed the current task. project: int Project ID for this task file_upload: str Uploaded file used as data source for this task ``` """ response = self.make_request("GET", f"/api/tasks/{task_id}") return response.json() def get_tasks(self, filters=None, ordering=None, view_id=None, selected_ids=None, only_ids: bool = False)-
根据筛选条件、排序机制或预定义的视图ID,从数据管理器中检索任务子集。
参数
filters:Filters.create()- JSON objects representing Data Manager filters. Use
Filters.create()helper to create it. Example:
{ "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] }ordering:listofColumn- List with one string representing Data Manager ordering.
Use
Columnhelper class. Example:[Column.total_annotations],['-' + Column.total_annotations]- inverted order view_id:int- View ID, visible as a Data Manager tab, for which to retrieve filters, ordering, and selected items
selected_ids:listofints- Task IDs
only_ids:bool- If true, return only task IDs
返回
list- Task list with task data, annotations, predictions and other fields from the Data Manager
source code 浏览Git
def get_tasks( self, filters=None, ordering=None, view_id=None, selected_ids=None, only_ids: bool = False, ): """Retrieve a subset of tasks from the Data Manager based on a filter, ordering mechanism, or a predefined view ID. Parameters ---------- filters: label_studio_sdk.data_manager.Filters.create() JSON objects representing Data Manager filters. Use `label_studio_sdk.data_manager.Filters.create()` helper to create it. Example: ```json { "conjunction": "and", "items": [ { "filter": "filter:tasks:id", "operator": "equal", "type": "Number", "value": 1 } ] } ``` ordering: list of label_studio_sdk.data_manager.Column List with <b>one</b> string representing Data Manager ordering. Use `label_studio_sdk.data_manager.Column` helper class. Example: ```[Column.total_annotations]```, ```['-' + Column.total_annotations]``` - inverted order view_id: int View ID, visible as a Data Manager tab, for which to retrieve filters, ordering, and selected items selected_ids: list of ints Task IDs only_ids: bool If true, return only task IDs Returns ------- list Task list with task data, annotations, predictions and other fields from the Data Manager """ page = 1 result = [] data = {} while not data.get("end_pagination"): try: data = self.get_paginated_tasks( filters=filters, ordering=ordering, view_id=view_id, selected_ids=selected_ids, only_ids=only_ids, page=page, page_size=100, ) result += data["tasks"] page += 1 except LabelStudioException as e: logger.debug(f"Error during pagination: {e}") break return result def get_tasks_ids(self, *args, **kwargs)-
与
Project.get_tasks()相同,但仅返回任务ID。source code 浏览Git
def get_tasks_ids(self, *args, **kwargs): """Same as `label_studio_sdk.project.Project.get_tasks()` but returns only task IDs.""" kwargs["only_ids"] = True return self.get_tasks(*args, **kwargs) def get_unlabeled_tasks(self, only_ids=False)-
检索所有未完成的任务。 如果使用Label Studio Enterprise,这可能包括已被标注一次或多次但未达到项目标注设置中定义的完整次数的任务。
参数
only_ids:bool- Return only task IDs
返回
list- List of task dicts, the same as in
get_tasks.
source code 浏览Git
def get_unlabeled_tasks(self, only_ids=False): """Retrieve all tasks that are <b>not</b> completed. If using Label Studio Enterprise, this can include tasks that have been labeled one or more times, but not the full number of times defined in the project labeling settings. Parameters ---------- only_ids: bool Return only task IDs Returns ------- list List of task dicts, the same as in `get_tasks`. """ return self.get_tasks( filters={ "conjunction": "and", "items": [ { "filter": "filter:tasks:completed_at", "operator": "empty", "value": True, "type": "Datetime", } ], }, only_ids=only_ids, ) def get_unlabeled_tasks_ids(self)-
检索所有未完成任务的任务ID。如果使用Label Studio Enterprise,这可能包括已标注一次或多次但未达到项目标注设置中定义的全部次数的任务。
返回
list- List of task IDs
source code 浏览Git
def get_unlabeled_tasks_ids(self): """Retrieve all task IDs for tasks that are <b>not</b> completed. If using Label Studio Enterprise, this can include tasks that have been labeled one or more times, but not the full number of times defined in the project labeling settings. Returns ------- list List of task IDs """ return self.get_unlabeled_tasks(only_ids=True) def get_views(self)-
获取与项目相关的所有视图
返回
list- List of view dicts
The each dict contains the following fields:id:int- View ID
project:int- Project ID
user:int- User ID who created this tab
data:dict- Filters, orderings and other visual settings
source code 浏览Git
def get_views(self): """Get all views related to the project Returns ------- list List of view dicts The each dict contains the following fields: id: int View ID project: int Project ID user: int User ID who created this tab data: dict Filters, orderings and other visual settings """ response = self.make_request("GET", f"/api/dm/views?project={self.id}") return response.json() def import_tasks(self, tasks, preannotated_from_fields: List = None)-
导入JSON格式的标注任务。任务可以是未标注的或包含预测结果的。
参数
tasks:listofdicts | dict | path to file- Tasks in Label Studio JSON格式
preannotated_from_fields:listofstrings- Turns flat task JSON formatted like:
{"column1": value, "column2": value}into Label Studio prediction data format:{"data": {"column1"..}, "predictions": [{..."column2"}]Useful when all your data is stored in tabular format with one column dedicated to model predictions.
返回
listofint- Imported task IDs
source code 浏览Git
def import_tasks(self, tasks, preannotated_from_fields: List = None): """Import JSON-formatted labeling tasks. Tasks can be unlabeled or contain predictions. Parameters ---------- tasks: list of dicts | dict | path to file Tasks in <a href="https://labelstud.io/guide/tasks.html#Basic-Label-Studio-JSON-format"> Label Studio JSON format</a> preannotated_from_fields: list of strings Turns flat task JSON formatted like: `{"column1": value, "column2": value}` into Label Studio prediction data format: `{"data": {"column1"..}, "predictions": [{..."column2"}]` Useful when all your data is stored in tabular format with one column dedicated to model predictions. Returns ------- list of int Imported task IDs """ params = {"return_task_ids": "1"} if preannotated_from_fields: params["preannotated_from_fields"] = ",".join(preannotated_from_fields) if isinstance(tasks, (list, dict)): response = self.make_request( method="POST", url=f"/api/projects/{self.id}/import", json=tasks, params=params, timeout=(10, 600), ) elif isinstance(tasks, (str, Path)): # try import from file if not os.path.isfile(tasks): raise LabelStudioException(f"Not found import tasks file {tasks}") with open(tasks, mode="rb") as f: response = self.make_request( method="POST", url=f"/api/projects/{self.id}/import", files={"file": f}, params=params, timeout=(10, 600), ) else: raise TypeError( f'Not supported type provided as "tasks" argument: {type(tasks)}' ) response = response.json() if "import" in response: # check import status timeout = 300 fibonacci_backoff = [1, 1] start_time = time.time() while True: import_status = self.make_request( method="GET", url=f'/api/projects/{self.id}/imports/{response["import"]}', ).json() if import_status["status"] == "completed": return import_status["task_ids"] if import_status["status"] == "failed": raise LabelStudioException(import_status["error"]) if time.time() - start_time >= timeout: raise LabelStudioException("Import timeout") time.sleep(fibonacci_backoff[0]) fibonacci_backoff = [ fibonacci_backoff[1], fibonacci_backoff[0] + fibonacci_backoff[1], ] return response["task_ids"] def list_annotations(self, task_id: int) ‑> List-
列出任务的所有标注。
参数
task_id:int- Task ID
返回
listofdict:- List of annotations objects
source code 浏览Git
def list_annotations(self, task_id: int) -> List: """List all annotations for a task. Parameters ---------- task_id: int Task ID Returns ------- list of dict: List of annotations objects """ response = self.make_request("GET", f"/api/tasks/{task_id}/annotations") response.raise_for_status() return response.json() def set_model_version(self, model_version: str)-
设置当前模型版本,用于向标注者展示预测结果、在Label Studio Enterprise中执行不确定性采样和标注评估以及其他操作。
参数
model_version:string- It can be any string you want
source code 浏览Git
def set_model_version(self, model_version: str): """Set the current model version to use for displaying predictions to annotators, perform uncertainty sampling and annotation evaluations in Label Studio Enterprise, and other operations. Parameters ---------- model_version: string It can be any string you want """ self.set_params(model_version=model_version) def set_params(self, **kwargs)-
用于设置项目参数的低级函数。
source code 浏览Git
def set_params(self, **kwargs): """Low level function to set project parameters.""" response = self.make_request("PATCH", f"/api/projects/{self.id}", json=kwargs) assert response.status_code == 200 def set_published(self, is_published: bool)-
设置项目的发布状态。(仅限企业版)
参数
is_published:bool- Project publication state for reviewers and annotators
source code 浏览Git
def set_published(self, is_published: bool): """Set the project publication state. (Enterprise only) Parameters ---------- is_published: bool Project publication state for reviewers and annotators """ self.set_params(is_published=is_published) def set_sampling(self, sampling: ProjectSampling)-
为标注流程设置项目采样方法。
source code 浏览Git
def set_sampling(self, sampling: ProjectSampling): """Set the project sampling method for the labeling stream.""" self.set_params(sampling=sampling.value) def start_project(self, **kwargs)-
在Label Studio中创建一个新的标注项目。
参数
title:str- Project name.
description:str- Project description
label_config:str- Label config in XML format.
expert_instruction:str- Labeling instructions in HTML format
show_instruction:bool- Whether to display instructions to annotators before they start
show_skip_button:bool- Whether to show a skip button in the Label Studio UI and let annotators skip the task
enable_empty_annotation:bool- Allow annotators to submit empty annotations
show_annotation_history:bool- Show annotation history to annotator
organization:int- Organization ID
color:str- Color to decorate the project card in the Label Studio UI
maximum_annotations:int- Maximum number of annotations for one task. If the number of annotations per task is equal or greater to this value, the task is finished and is_labeled=True is set. (Enterprise only)
is_published:bool- Whether or not the project is published to annotators (Enterprise only)
model_version:str- Machine learning model version for predictions or pre-annotations
is_draft:bool- Whether or not the project is in the middle of being created (Enterprise only)
created_by:object- Details about the user that created the project
min_annotations_to_start_training:int- Minimum number of completed tasks after which model training is started
show_collab_predictions:bool- Whether to show model predictions to the annotator, allowing them to collaborate with the ML model
sampling:str- Type of sampling to use for task labeling. Uncertainty sampling is Enterprise only. Enum: "Sequential sampling" "Uniform sampling" "Uncertainty sampling"
show_ground_truth_first:bool- Whether to show tasks with ground truth annotations first (Enterprise only)
show_overlap_first:bool- Whether to show tasks with overlap first (Enterprise only)
overlap_cohort_percentage:int- Percentage of tasks that must be annotated multiple times. (Enterprise only)
task_data_login:str- User credentials for accessing task data. (Enterprise only)
task_data_password:str- Password credentials for accessing task data. (Enterprise only)
control_weights:object- Weights for control tags used when calculating agreement metrics. (Enterprise only)
evaluate_predictions_automatically:bool- Retrieve and display predictions when loading a task
出现错误时抛出LabelStudioException异常。
source code 浏览Git
def start_project(self, **kwargs): """Create a new labeling project in Label Studio. Parameters ---------- title: str Project name. description: str Project description label_config: str Label config in XML format. expert_instruction: str Labeling instructions in HTML format show_instruction: bool Whether to display instructions to annotators before they start show_skip_button: bool Whether to show a skip button in the Label Studio UI and let annotators skip the task enable_empty_annotation: bool Allow annotators to submit empty annotations show_annotation_history: bool Show annotation history to annotator organization: int Organization ID color: str Color to decorate the project card in the Label Studio UI maximum_annotations: int Maximum number of annotations for one task. If the number of annotations per task is equal or greater to this value, the task is finished and is_labeled=True is set. (Enterprise only) is_published: bool Whether or not the project is published to annotators (Enterprise only) model_version: str Machine learning model version for predictions or pre-annotations is_draft: bool Whether or not the project is in the middle of being created (Enterprise only) created_by: object Details about the user that created the project min_annotations_to_start_training: int Minimum number of completed tasks after which model training is started show_collab_predictions: bool Whether to show model predictions to the annotator, allowing them to collaborate with the ML model sampling: str Type of sampling to use for task labeling. Uncertainty sampling is Enterprise only. Enum: "Sequential sampling" "Uniform sampling" "Uncertainty sampling" show_ground_truth_first: bool Whether to show tasks with ground truth annotations first (Enterprise only) show_overlap_first: bool Whether to show tasks with overlap first (Enterprise only) overlap_cohort_percentage: int Percentage of tasks that must be annotated multiple times. (Enterprise only) task_data_login: str User credentials for accessing task data. (Enterprise only) task_data_password: str Password credentials for accessing task data. (Enterprise only) control_weights: object Weights for control tags used when calculating agreement metrics. (Enterprise only) evaluate_predictions_automatically: bool Retrieve and display predictions when loading a task Raises LabelStudioException in case of errors. """ response = self.make_request("POST", "/api/projects", json=kwargs) if response.status_code == 201: self.params = response.json() else: raise LabelStudioException("Project not created") def sync_export_storage(self, storage_type, storage_id)-
同步导出(目标)云存储。
参数
storage_type:string- Specify the type of the storage container. See ProjectStorage for available types.
storage_id:int- Specify the storage ID of the storage container. See get_export_storages() to get ids.
返回
dict:- containing the same fields as in the original storage request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
other fields:- See more https://api.labelstud.io/#tag/Storage:S3/operation/api_storages_export_s3_sync_create
source code 浏览Git
def sync_export_storage(self, storage_type, storage_id): """Synchronize Export (Target) Cloud Storage. Parameters ---------- storage_type: string Specify the type of the storage container. See ProjectStorage for available types. storage_id: int Specify the storage ID of the storage container. See get_export_storages() to get ids. Returns ------- dict: containing the same fields as in the original storage request and: id: int Storage ID type: str Type of storage created_at: str Creation time other fields: See more https://api.labelstud.io/#tag/Storage:S3/operation/api_storages_export_s3_sync_create """ response = self.make_request( "POST", f"/api/storages/export/{storage_type}/{str(storage_id)}/sync" ) return response.json() def sync_import_storage(self, storage_type, storage_id)-
同步导入(源)云存储。
参数
storage_type:string- Specify the type of the storage container. See ProjectStorage for available types.
storage_id:int- Specify the storage ID of the storage container. See get_import_storages() to get ids.
返回
dict:- containing the same fields as in the original storage request and:
id:int- Storage ID
type:str- Type of storage
created_at:str- Creation time
last_sync:str- Time last sync finished, can be empty.
last_sync_count:int- Number of tasks synced in the last sync
source code 浏览Git
def sync_import_storage(self, storage_type, storage_id): """Synchronize Import (Source) Cloud Storage. Parameters ---------- storage_type: string Specify the type of the storage container. See ProjectStorage for available types. storage_id: int Specify the storage ID of the storage container. See get_import_storages() to get ids. Returns ------- dict: containing the same fields as in the original storage request and: id: int Storage ID type: str Type of storage created_at: str Creation time last_sync: str Time last sync finished, can be empty. last_sync_count: int Number of tasks synced in the last sync """ # originally syn was implemented in Client class, keep it for compatibility response = self.make_request( "POST", f"/api/storages/{storage_type}/{str(storage_id)}/sync" ) return response.json() def update_annotation(self, annotation_id, **kwargs)-
使用新的标注参数更新特定标注,例如
project.update_annotation(annotation_id=123, ground_truth=True)参数
annotation_id:int- Existing annotation ID from current project. Could be retrieved from
project.get_tasks()response kwargs:kwargs parameters- List of annotation parameters. Check all available parameters here
返回
dict- Dict with updated annotation
source code 浏览Git
def update_annotation(self, annotation_id, **kwargs): """Update specific annotation with new annotation parameters, e.g. ``` project.update_annotation(annotation_id=123, ground_truth=True) ``` Parameters ---------- annotation_id: int Existing annotation ID from current project. Could be retrieved from `project.get_tasks()` response kwargs: kwargs parameters List of annotation parameters. Check all available parameters [here](https://labelstud.io/guide/export.html#Label-Studio-JSON-format-of-annotated-tasks) Returns ------- dict Dict with updated annotation """ response = self.make_request( "PATCH", f"/api/annotations/{annotation_id}", json=kwargs ) response.raise_for_status() return response.json() def update_params(self)-
获取所有可用的项目参数并缓存它们。
source code 浏览Git
def update_params(self): """Get [all available project parameters](#label_studio_sdk.project.Project.get_params) and cache them.""" self.params = self.get_params() def update_task(self, task_id, **kwargs)-
根据ID更新特定任务。
参数
task_id:int- Task ID you want to update
kwargs:kwargs parameters- List of parameters to update. Check all available parameters here
返回
dict:- Dict with updated task
source code 浏览Git
def update_task(self, task_id, **kwargs): """Update specific task by ID. Parameters ---------- task_id: int Task ID you want to update kwargs: kwargs parameters List of parameters to update. Check all available parameters [here](https://labelstud.io/api#operation/api_tasks_partial_update) Returns ------- dict: Dict with updated task """ response = self.make_request("PATCH", f"/api/tasks/{task_id}", json=kwargs) response.raise_for_status() return response.json()
class ProjectSampling (value, names=None, *, module=None, qualname=None, type=None, start=1)-
列举可用于标注的任务采样模式。
source code 浏览Git
class ProjectSampling(Enum): """Enumerate the available task sampling modes for labeling.""" RANDOM = "Uniform sampling" """ Uniform random sampling of tasks """ SEQUENCE = "Sequential sampling" """ Sequential sampling of tasks using task IDs """ UNCERTAINTY = "Uncertainty sampling" """ Sample tasks based on prediction scores, such as for active learning (Enterprise only)"""常量
RANDOM-
任务的均匀随机抽样
SEQUENCE-
按任务ID顺序采样任务
UNCERTAINTY-
基于预测分数采样任务,例如用于主动学习(仅限企业版)
class ProjectStorage (value, names=None, *, module=None, qualname=None, type=None, start=1)-
列举可用于标注项目的外部来源和目标存储类型。
source code 浏览Git
class ProjectStorage(Enum): """Enumerate the available types of external source and target storage for labeling projects.""" GOOGLE = "gcs" """ Google Cloud Storage """ S3 = "s3" """ Amazon S3 Storage """ AZURE = "azure_blob" """ Microsoft Azure Blob Storage """ LOCAL = "localfiles" """ Label Studio Local File Storage """ REDIS = "redis" """ Redis Storage """ S3_SECURED = "s3s" """ Amazon S3 Storage secured by IAM roles (Enterprise only) """常量
AZURE-
微软Azure Blob存储
GOOGLE-
Google Cloud Storage
LOCAL-
Label Studio 本地文件存储
REDIS-
Redis存储
S3-
亚马逊S3存储
S3_SECURED-
由IAM角色保护的Amazon S3存储(仅限企业版)