Skip to content

Exploring remote datasets creates virtual dataset #8708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions app/controllers/DatasetController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ class DatasetController @Inject()(userService: UserService,
_ <- Fox.fromBool(dataSource.dataLayers.nonEmpty) ?~> "dataset.explore.zeroLayers"
folderIdOpt <- Fox.runOptional(request.body.folderPath)(folderPath =>
folderService.getOrCreateFromPathLiteral(folderPath, request.identity._organization)) ?~> "dataset.explore.autoAdd.getFolder.failed"
_ <- wkExploreRemoteLayerService.addRemoteDatasource(dataSource,
request.body.datasetName,
request.identity,
folderIdOpt) ?~> "dataset.explore.autoAdd.failed"
_ <- wkExploreRemoteLayerService.addRemoteDatasourceToDatabase(dataSource,
request.body.datasetName,
request.identity,
folderIdOpt) ?~> "dataset.explore.autoAdd.failed"
} yield Ok
}

Expand Down
30 changes: 29 additions & 1 deletion app/controllers/WKRemoteDataStoreController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.scalableminds.webknossos.datastore.helpers.{LayerMagLinkInfo, MagLink
import com.scalableminds.webknossos.datastore.models.UnfinishedUpload
import com.scalableminds.webknossos.datastore.models.datasource.DataSourceId
import com.scalableminds.webknossos.datastore.models.datasource.inbox.{InboxDataSourceLike => InboxDataSource}
import com.scalableminds.webknossos.datastore.services.{DataSourcePathInfo, DataStoreStatus}
import com.scalableminds.webknossos.datastore.services.{DataSourcePathInfo, DataSourceRegistrationInfo, DataStoreStatus}
import com.scalableminds.webknossos.datastore.services.uploading.{
LinkedLayerIdentifier,
ReserveAdditionalInformation,
Expand Down Expand Up @@ -270,6 +270,34 @@ class WKRemoteDataStoreController @Inject()(

}

// Register a datasource from the datastore as a dataset in the database.
// This is called when adding remote virtual datasets (that should only exist in the database)
// by the data store after exploration.
def registerDataSource(name: String,
key: String,
organizationId: String,
directoryName: String,
token: String): Action[DataSourceRegistrationInfo] =
Action.async(validateJson[DataSourceRegistrationInfo]) { implicit request =>
dataStoreService.validateAccess(name, key) { dataStore =>
for {
user <- bearerTokenService.userForToken(token)
organization <- organizationDAO.findOne(organizationId)(GlobalAccessContext) ?~> Messages(
"organization.notFound",
organizationId) ~> NOT_FOUND
_ <- Fox.fromBool(organization._id == user._organization) ?~> "notAllowed" ~> FORBIDDEN
dataset <- datasetService.createVirtualDataset(
directoryName,
organizationId,
dataStore,
request.body.dataSource,
request.body.folderId,
user
)
} yield Ok(dataset._id.toString)
}
}

def jobExportProperties(name: String, key: String, jobId: ObjectId): Action[AnyContent] = Action.async {
implicit request =>
dataStoreService.validateAccess(name, key) { _ =>
Expand Down
4 changes: 2 additions & 2 deletions app/models/dataset/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,8 @@ class DatasetMagsDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionConte
layer.magsOpt match {
case Some(mags) =>
mags.map(mag => {
q"""INSERT INTO webknossos.dataset_mags(_dataset, dataLayerName, mag, axisOrder, channelIndex, credentialId)
VALUES($datasetId, ${layer.name}, ${mag.mag}, ${mag.axisOrder
q"""INSERT INTO webknossos.dataset_mags(_dataset, dataLayerName, mag, path, axisOrder, channelIndex, credentialId)
VALUES($datasetId, ${layer.name}, ${mag.mag}, ${mag.path}, ${mag.axisOrder
.map(Json.toJson(_))}, ${mag.channelIndex}, ${mag.credentialId})
""".asUpdate
})
Expand Down
70 changes: 61 additions & 9 deletions app/models/dataset/DatasetService.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package models.dataset

import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext}
import com.scalableminds.util.accesscontext.{AuthorizedAccessContext, DBAccessContext, GlobalAccessContext}
import com.scalableminds.util.objectid.ObjectId
import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.{Fox, FoxImplicits}
Expand All @@ -9,6 +9,8 @@ import com.scalableminds.webknossos.datastore.dataformats.layers.{
N5SegmentationLayer,
PrecomputedDataLayer,
PrecomputedSegmentationLayer,
WKWDataLayer,
WKWSegmentationLayer,
Zarr3DataLayer,
Zarr3SegmentationLayer,
ZarrDataLayer,
Expand All @@ -23,6 +25,7 @@ import com.scalableminds.webknossos.datastore.models.datasource.{
AbstractDataLayer,
AbstractSegmentationLayer,
DataFormat,
DataSource,
DataSourceId,
GenericDataSource,
DataLayerLike => DataLayer
Expand Down Expand Up @@ -50,7 +53,6 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
datasetLastUsedTimesDAO: DatasetLastUsedTimesDAO,
datasetDataLayerDAO: DatasetLayerDAO,
datasetMagsDAO: DatasetMagsDAO,
datasetLayerAttachmentsDAO: DatasetLayerAttachmentsDAO,
teamDAO: TeamDAO,
folderDAO: FolderDAO,
dataStoreService: DataStoreService,
Expand All @@ -63,7 +65,9 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
with LazyLogging {
private val unreportedStatus = datasetDAO.unreportedStatus
private val notYetUploadedStatus = "Not yet fully uploaded."
private val inactiveStatusList = List(unreportedStatus, notYetUploadedStatus, datasetDAO.deletedByUserStatus)
private val virtualRemoteDatasetStatus = "Virtual remote dataset" // Virtual datasets should not be deleted when not reported
private val inactiveStatusList =
List(unreportedStatus, notYetUploadedStatus, datasetDAO.deletedByUserStatus, virtualRemoteDatasetStatus)

def assertValidDatasetName(name: String): Fox[Unit] =
for {
Expand Down Expand Up @@ -97,6 +101,32 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
} yield newDataset
}

def createVirtualDataset(datasetName: String,
organizationId: String,
dataStore: DataStore,
dataSource: DataSource,
folderId: Option[String],
user: User): Fox[Dataset] =
for {
_ <- assertValidDatasetName(datasetName)
isDatasetNameAlreadyTaken <- datasetDAO.doesDatasetDirectoryExistInOrganization(datasetName, organizationId)(
GlobalAccessContext)
_ <- Fox.fromBool(!isDatasetNameAlreadyTaken) ?~> "dataset.name.alreadyTaken"
organization <- organizationDAO.findOne(organizationId)(GlobalAccessContext) ?~> "organization.notFound"
folderId <- ObjectId.fromString(folderId.getOrElse(organization._rootFolder.toString)) ?~> "dataset.upload.folderId.invalid"
_ <- folderDAO.assertUpdateAccess(folderId)(AuthorizedAccessContext(user)) ?~> "folder.noWriteAccess"
newDatasetId = ObjectId.generate
abstractDataSource = dataSource.copy(dataLayers = dataSource.dataLayers.map(_.asAbstractLayer))
dataset <- createDataset(dataStore,
newDatasetId,
datasetName,
abstractDataSource,
status = Some(virtualRemoteDatasetStatus))
datasetId = dataset._id
_ <- datasetDAO.updateFolder(datasetId, folderId)(GlobalAccessContext)
_ <- addUploader(dataset, user._id)(GlobalAccessContext)
} yield dataset

def getAllUnfinishedDatasetUploadsOfUser(userId: ObjectId, organizationId: String)(
implicit ctx: DBAccessContext): Fox[List[DatasetCompactInfo]] =
datasetDAO.findAllCompactWithSearch(
Expand All @@ -114,7 +144,8 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
datasetId: ObjectId,
datasetName: String,
dataSource: InboxDataSource,
publication: Option[ObjectId] = None
publication: Option[ObjectId] = None,
status: Option[String] = None
): Fox[Dataset] = {
implicit val ctx: DBAccessContext = GlobalAccessContext
val metadata =
Expand Down Expand Up @@ -147,7 +178,7 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
name = datasetName,
voxelSize = dataSource.voxelSizeOpt,
sharingToken = None,
status = dataSource.statusOpt.getOrElse(""),
status = status.orElse(dataSource.statusOpt).getOrElse(""),
logoUrl = None,
metadata = metadata
)
Expand Down Expand Up @@ -331,8 +362,18 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
case Some(df) =>
df match {
case DataFormat.wkw =>
throw new NotImplementedError(
"WKW data format not supported in this context, only datasets with MagLocators are supported")
WKWDataLayer(
name,
category,
boundingBox,
mags,
elementClass,
defaultViewConfiguration,
adminViewConfiguration,
coordinateTransformations,
additionalAxes,
attachmentsOpt
)
case DataFormat.neuroglancerPrecomputed =>
PrecomputedDataLayer(
name,
Expand Down Expand Up @@ -413,8 +454,19 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
case Some(df) =>
df match {
case DataFormat.wkw =>
throw new NotImplementedError(
"WKW data format not supported in this context, only datasets with MagLocators are supported")
WKWSegmentationLayer(
name,
boundingBox,
mags,
elementClass,
mappings,
largestSegmentId,
defaultViewConfiguration,
adminViewConfiguration,
coordinateTransformations,
additionalAxes,
attachmentsOpt
)
case DataFormat.neuroglancerPrecomputed =>
PrecomputedSegmentationLayer(
name,
Expand Down
14 changes: 0 additions & 14 deletions app/models/dataset/WKRemoteDataStoreClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import com.scalableminds.webknossos.datastore.explore.{
ExploreRemoteLayerParameters
}
import com.scalableminds.webknossos.datastore.models.{AdditionalCoordinate, RawCuboidRequest}
import com.scalableminds.webknossos.datastore.models.datasource.{DataLayer, GenericDataSource}
import com.scalableminds.webknossos.datastore.rpc.RPC
import com.scalableminds.webknossos.datastore.services.DirectoryStorageReport
import com.typesafe.scalalogging.LazyLogging
import controllers.RpcTokenHolder
import play.api.libs.json.JsObject
import play.utils.UriEncoding
import com.scalableminds.util.objectid.ObjectId

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
Expand Down Expand Up @@ -82,18 +80,6 @@ class WKRemoteDataStoreClient(dataStore: DataStore, rpc: RPC) extends LazyLoggin
.silent
.getWithJsonResponse[List[DirectoryStorageReport]]

def addDataSource(organizationId: String,
datasetName: String,
dataSource: GenericDataSource[DataLayer],
folderId: Option[ObjectId],
userToken: String): Fox[Unit] =
for {
_ <- rpc(s"${dataStore.url}/data/datasets/$organizationId/$datasetName")
.addQueryString("token" -> userToken)
.addQueryStringOptional("folderId", folderId.map(_.toString))
.postJson(dataSource)
} yield ()

def hasSegmentIndexFile(organizationId: String, datasetName: String, layerName: String)(
implicit ec: ExecutionContext): Fox[Boolean] = {
val cacheKey = (organizationId, datasetName, layerName)
Expand Down
22 changes: 13 additions & 9 deletions app/models/dataset/explore/WKExploreRemoteLayerService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,21 @@ class WKExploreRemoteLayerService @Inject()(credentialService: CredentialService
credentialId <- Fox.runOptional(credentialOpt)(c => credentialService.insertOne(c)) ?~> "dataVault.credential.insert.failed"
} yield credentialId

def addRemoteDatasource(dataSource: GenericDataSource[DataLayer],
datasetName: String,
user: User,
folderId: Option[ObjectId])(implicit ctx: DBAccessContext): Fox[Unit] =
def addRemoteDatasourceToDatabase(dataSource: GenericDataSource[DataLayer],
datasetName: String,
user: User,
folderId: Option[ObjectId])(implicit ctx: DBAccessContext): Fox[Unit] =
for {
organization <- organizationDAO.findOne(user._organization)
dataStore <- dataStoreDAO.findOneWithUploadsAllowed
organizationId = user._organization
_ <- datasetService.assertValidDatasetName(datasetName)
client = new WKRemoteDataStoreClient(dataStore, rpc)
userToken <- bearerTokenService.createAndInitDataStoreTokenForUser(user)
_ <- client.addDataSource(organization._id, datasetName, dataSource, folderId, userToken)
datasetId <- datasetService.createVirtualDataset(
dataSource.id.directoryName,
organizationId,
dataStore,
dataSource,
folderId.map(_.toString),
user
)
} yield ()

}
1 change: 1 addition & 0 deletions conf/webknossos.latest.routes
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ PUT /datastores/:name/datasources
PUT /datastores/:name/datasources/paths controllers.WKRemoteDataStoreController.updatePaths(name: String, key: String)
GET /datastores/:name/datasources/:organizationId/:directoryName/paths controllers.WKRemoteDataStoreController.getPaths(name: String, key: String, organizationId: String, directoryName: String)
GET /datastores/:name/datasources/:datasetId controllers.WKRemoteDataStoreController.getDataSource(name: String, key: String, datasetId: ObjectId)
POST /datastores/:name/datasources/:organizationId/:directoryName controllers.WKRemoteDataStoreController.registerDataSource(name: String, key: String, organizationId: String, directoryName: String, token: String)
PATCH /datastores/:name/status controllers.WKRemoteDataStoreController.statusUpdate(name: String, key: String)
POST /datastores/:name/reserveUpload controllers.WKRemoteDataStoreController.reserveDatasetUpload(name: String, key: String, token: String)
GET /datastores/:name/getUnfinishedUploadsForUser controllers.WKRemoteDataStoreController.getUnfinishedUploadsForUser(name: String, key: String, token: String, organizationName: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,17 @@ export async function requestWithFallback(
batch: Array<BucketAddress>,
): Promise<Array<Uint8Array<ArrayBuffer> | null | undefined>> {
const state = Store.getState();
const datasetId = state.dataset.id;
const datasetDirectoryName = state.dataset.directoryName;
const organization = state.dataset.owningOrganization;
const dataStoreHost = state.dataset.dataStore.url;
const tracingStoreHost = state.annotation.tracingStore.url;

// Prefer datasetId (id) if available, otherwise fall back to old method
const getDataStoreUrl = (optLayerName?: string) =>
`${dataStoreHost}/data/datasets/${organization}/${datasetDirectoryName}/layers/${
optLayerName || layerInfo.name
}`;
datasetId
? `${dataStoreHost}/data/wkDatasets/${datasetId}/layers/${optLayerName || layerInfo.name}`
: `${dataStoreHost}/data/datasets/${organization}/${datasetDirectoryName}/layers/${optLayerName || layerInfo.name}`;

const getTracingStoreUrl = () => `${tracingStoreHost}/tracings/volume/${layerInfo.name}`;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,32 +387,17 @@ class DataSourceController @Inject()(
}
}

// Stores a remote dataset in the database.
// Called by the frontend after the user has set datasetName / FolderId of an explored dataSource
// Add this data source to the WK database
def add(organizationId: String, datasetName: String, folderId: Option[String]): Action[DataSource] =
Action.async(validateJson[DataSource]) { implicit request =>
accessTokenService.validateAccessFromTokenContext(UserAccessRequest.administrateDataSources) {
for {
reservedAdditionalInfo <- dsRemoteWebknossosClient.reserveDataSourceUpload(
ReserveUploadInformation(
uploadId = "", // Set by core backend
name = datasetName,
organization = organizationId,
totalFileCount = 1,
filePaths = None,
totalFileSizeInBytes = None,
layersToLink = None,
initialTeams = List.empty,
folderId = folderId,
requireUniqueName = Some(false),
)
) ?~> "dataset.upload.validation.failed"
datasourceId = DataSourceId(reservedAdditionalInfo.directoryName, organizationId)
_ <- dataSourceService.updateDataSource(request.body.copy(id = datasourceId), expectExisting = false)
uploadedDatasetId <- dsRemoteWebknossosClient.reportUpload(datasourceId,
0L,
needsConversion = false,
viaAddRoute = true) ?~> "reportUpload.failed"
} yield Ok(Json.obj("newDatasetId" -> uploadedDatasetId))
_ <- Fox.successful(())
dataSourceId = DataSourceId(datasetName, organizationId)
dataSource = request.body.copy(id = dataSourceId)
datasetId <- dsRemoteWebknossosClient.registerDataSource(dataSource, dataSourceId, folderId) ?~> "dataset.add.failed"
} yield Ok(Json.obj("newDatasetId" -> datasetId))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,24 @@ case class N5DataLayer(
override val numChannels: Option[Int] = Some(1),
additionalAxes: Option[Seq[AdditionalAxis]] = None,
attachments: Option[DatasetLayerAttachments] = None,
) extends N5Layer
) extends N5Layer {
override def asAbstractLayer: DataLayerLike =
AbstractDataLayer(
name,
category,
boundingBox,
resolutions,
elementClass,
defaultViewConfiguration,
adminViewConfiguration,
coordinateTransformations,
additionalAxes,
attachments,
Some(mags),
numChannels,
Some(dataFormat)
)
}

object N5DataLayer {
implicit val jsonFormat: OFormat[N5DataLayer] = Json.format[N5DataLayer]
Expand All @@ -57,7 +74,26 @@ case class N5SegmentationLayer(
additionalAxes: Option[Seq[AdditionalAxis]] = None,
attachments: Option[DatasetLayerAttachments] = None,
) extends SegmentationLayer
with N5Layer
with N5Layer {
override def asAbstractLayer: DataLayerLike =
AbstractSegmentationLayer(
name,
category,
boundingBox,
resolutions,
elementClass,
largestSegmentId,
mappings,
defaultViewConfiguration,
adminViewConfiguration,
coordinateTransformations,
additionalAxes,
attachments,
Some(mags),
numChannels,
Some(dataFormat)
)
}

object N5SegmentationLayer {
implicit val jsonFormat: OFormat[N5SegmentationLayer] = Json.format[N5SegmentationLayer]
Expand Down
Loading