111 lines
3.1 KiB
Python
111 lines
3.1 KiB
Python
"""
|
|
core.pkg.backend.local - Vhex core backend for local package
|
|
"""
|
|
import os
|
|
import subprocess
|
|
|
|
from core.logger import log
|
|
from core.pkg.backend.core import VxRemoteBackendCore
|
|
from core.pkg.version import VxVersion
|
|
|
|
__all__ = [
|
|
'VxBackendLocal'
|
|
]
|
|
|
|
#---
|
|
# Public
|
|
#---
|
|
|
|
class VxBackendLocal(VxRemoteBackendCore):
|
|
"""
|
|
Vhex backend local package class
|
|
"""
|
|
|
|
#---
|
|
# Public properties
|
|
#---
|
|
|
|
@property
|
|
def name(self):
|
|
return 'local'
|
|
|
|
@property
|
|
def url(self):
|
|
return self._url
|
|
|
|
@property
|
|
def package_list(self):
|
|
if self._pkg_list:
|
|
return self._pkg_list
|
|
|
|
if not os.path.exists(self._url):
|
|
os.makedirs(self._url)
|
|
return []
|
|
|
|
self._pkg_list = []
|
|
for file in os.listdir(self._url):
|
|
if not os.path.isdir(f"{self._url}/{file}"):
|
|
continue
|
|
exists = False
|
|
for pkg in self._pkg_list:
|
|
if pkg['name'] == file.split('@')[1]:
|
|
exists = True
|
|
break
|
|
if exists:
|
|
continue
|
|
self._pkg_list.append({
|
|
'name' : file.split('@')[1],
|
|
'full_name' : f"{self._url}/{file.split('@')[1]}",
|
|
'description' : None,
|
|
'url' : f"{self._url}/{file}",
|
|
'created' : None,
|
|
'updated' : None,
|
|
'author' : file.split('@')[0],
|
|
'default_branch' : None,
|
|
'storage' : 'local',
|
|
'versions' : [
|
|
VxVersion(file.split('@')[2], file.split('@')[3], 'local')
|
|
]
|
|
})
|
|
return self._pkg_list
|
|
|
|
#---
|
|
# Private methods
|
|
#---
|
|
|
|
def package_fetch_versions(self, pkg):
|
|
return pkg['versions']
|
|
|
|
#---
|
|
# Public methods
|
|
#---
|
|
|
|
def package_clone(self, pkg, prefix='', bare=False):
|
|
if prefix:
|
|
output_path = prefix
|
|
if prefix[-1] == '/':
|
|
output_path = f"{prefix}{pkg['name']}"
|
|
if not os.path.isdir(prefix):
|
|
log.error(
|
|
f"package '{pkg['name']}' cannot be cloned at "
|
|
f"'{prefix}' because this path is not a directory"
|
|
)
|
|
return ''
|
|
else:
|
|
if not os.path.exists(prefix := os.getcwd()):
|
|
os.makedirs(prefix)
|
|
output_path = f"{prefix}/{pkg['name']}"
|
|
if not os.path.exists(output_path):
|
|
if not bare:
|
|
log.debug(f"[local] link '{pkg['url']}' > '{output_path}'")
|
|
os.symlink(pkg['url'], output_path, target_is_directory=True)
|
|
else:
|
|
log.debug(f"[local] bare cpy '{pkg['url']}' > '{output_path}'")
|
|
subprocess.run(
|
|
f"cp -r {pkg['url']} {output_path}".split(),
|
|
check=False
|
|
)
|
|
else:
|
|
log.debug(f"[local] file '{output_path}' already exists, skipped")
|
|
return output_path
|