Skip to content
Snippets Groups Projects
Commit 329fd4d8 authored by nimrod's avatar nimrod
Browse files

- First commit.

parents
No related branches found
No related tags found
No related merge requests found
~*
*~
*.swp
*.swo
*.pyc
dcp.py 0 → 100755
#!/usr/bin/env python3
from hashlib import sha1
from base64 import b64encode
from os import stat, listdir, mkdir, statvfs
from os.path import isdir, basename, exists
from shutil import copyfile
from xml.dom.minidom import parse
import sys
import vlc
class Asset(object):
'''A simple asset that's part of whole DCP package.'''
def verifySize(self):
'''verify that the size of the file is correct, if present.'''
try:
return exists(self.fullpath) and stat(self.fullpath).st_size ==\
self.size
except AttributeError:
return True
def verifyHash(self):
'''verify that the hash is correct, if present.'''
if not hasattr(self, 'hash'):
return True
# from os import stat
# try:
# buffersize = stat(self.fullpath).st_blksize
# except:
# buffersize = 2**16
buffersize = 2 ** 16
fh = open(self.fullpath, 'rb')
buffer = fh.read(buffersize)
hash = sha1()
while buffer:
hash.update(buffer)
buffer = fh.read(buffersize)
fh.close()
return(self.hash == b64encode(hash.digest()).decode())
def copyAndVerify(self, destination):
'''Copies the file to the destination directory and verifies the hash
during.'''
newfilepath = destination + '/' + self.filename
if not hasattr(self, 'hash'):
copyfile(self.fullpath, newfilepath)
self.fullpath = newfilepath
self.rootpath = destination
return True
buffersize = 2 ** 16
in_fh = open(self.fullpath, 'rb')
out_fh = open(newfullpath, 'wb')
buffer = in_fh.read(buffersize)
hash = sha1()
while buffer:
hash.update(buffer)
out_fh.write(buffer)
buffer = in_fh.read(buffersize)
in_fh.close()
out_fh.close()
self.rootpath = destination
self.fullpath = newfullpath
return(self.hash == b64encode(hash.digest()).decode())
def add_duration(self):
if hasattr(self, 'type') and self.type.find('mxf') > -1:
instance = vlc.Instance()
media = instance.media_new('file://' + self.fullpath)
media.parse()
self.duration = media.get_duration()
def __init__(self, rootpath, filename, id=None, hash=None, size=None,
packinglist=False, type=None):
'''Initialize an asset, has to have a filename and path.'''
self.rootpath = rootpath
if filename[0:8] == 'file:///':
filename = filename[8:]
self.filename = filename
self.fullpath = rootpath + '/' + filename
if id is not None:
self.id = id
if hash is not None:
self.hash = hash
if size is not None:
self.size = size
if type is not None:
self.type = type
self.packinglist = packinglist
class DCP:
'''A complete DCP package.'''
def verify(self):
'''Verifies that all assets are of the correct size and have
the correct hash.'''
for asset in self.assets:
try:
if not asset.verifySize():
return False
except BaseException as e:
raise RuntimeError('Failed size comparisement for ' +
asset.filename) from e
'''Sort the assets by size before calculating hashes, for
performance.'''
self.assets.sort(key=lambda x: try: return x.size except
AttributeError: return 0)
for asset in self.assets:
try:
if not asset.verifyHash():
return False
except BaseException as e:
raise RuntimeError('Failed hash calculation for ' +
asset.filename) from e
return True
def _parse_assetmap(self):
'''Adds the asset map file to the list of assets and parses it.'''
if 'ASSETMAP.xml' in listdir(self.directory):
filename = 'ASSETMAP.xml'
elif 'ASSETMAP' in listdir(self.directory):
filename = 'ASSETMAP'
else:
raise RuntimeError('Couldn\'t find assetmap file')
self.assets = [Asset(self.directory, filename, type='text/xml')]
assetmap = self.assets[0].fullpath
try:
assetmap = parse(assetmap).getElementsByTagName('Asset')
self.assets.append(Asset(self.directory, filename))
for element in assetmap:
id = element.getElementsByTagName('Id')[0].firstChild.data
id = id.split(':')[-1]
filename = element.getElementsByTagName('Path')[0]\
.firstChild.data
packinglist = len(element.getElementsByTagName
('PackingList')) > 0
if packinglist:
self.assets.append(Asset(self.directory, filename,
id=id, packinglist=packinglist,
type='text/xml'))
else:
self.assets.append(Asset(self.directory, filename,
id=id, packinglist=packinglist))
except BaseException as e:
raise RuntimeError('Failed to parse assetmap file') from e
def _add_volindex(self):
'''Adds the volume index file to the list of assets.'''
if 'VOLINDEX.xml' in listdir(self.directory):
filename = 'VOLINDEX.xml'
elif 'VOLINDEX' in listdir(self.directory):
filename = 'VOLINDEX'
else:
'''raise RuntimeError('Couldn\'t find volindex file')'''
return
self.assets.append(Asset(self.directory, filename, type='text/xml'))
def _parse_packinglist(self):
'''Parses the packing list.'''
try:
pkls = (parse(x.fullpath) for x in self.assets if x.packinglist)
for pkl in pkls:
if not hasattr(self, 'signed'):
self.signed = len(pkl.getElementsByTagName('Signer')) > 0
try:
if not hasattr(self, 'name'):
self.name = pkl\
.getElementsByTagName('AnnotationText')[0]\
.firstChild.data.strip()
except:
pass
for element in pkl.getElementsByTagName('Asset'):
id = element.getElementsByTagName('Id')[0].firstChild.data
id = id.split(':')[-1]
hash = element.getElementsByTagName('Hash')[0]\
.firstChild.data
type = element.getElementsByTagName('Type')[0]\
.firstChild.data
size = int(element.getElementsByTagName('Size')[0]
.firstChild.data)
asset = [x for x in self.assets if hasattr(x, 'id') and
x.id == id][0]
asset.hash = hash
asset.size = size
asset.type = type
asset.add_duration()
except BaseException as e:
raise RuntimeError('Failed to parse packinglist file') from e
def _find_cpl(self):
'''Goes through the xml files, finds the CPL and extracts the data from
it.'''
for asset in [x for x in self.assets if hasattr(x, 'type') and x
.type.find('xml') > -1]:
elements = (parse(asset.fullpath))\
.getElementsByTagName('CompositionPlaylist')
if len(elements) > 0:
try:
self.cpl_id = elements[0]\
.getElementsByTagName('Id')[0].firstChild.data
self.cpl_date = elements[0]\
.getElementsByTagName('IssueDate')[0].firstChild.data
except:
pass
def __init__(self, directory):
'''Parses the DCP in the directory specified.'''
self.directory = directory
self._parse_assetmap()
self._add_volindex()
self._parse_packinglist()
try:
self.duration = max([x.duration for x in self.assets if hasattr
(x, 'duration')])
except:
self.duration = 'Unknown'
self._find_cpl()
def copyAndVerify(self, destination):
'''Copies the DCP to the destination directory and verifies during.'''
for asset in self.assets:
totalsize = stat(asset.fullpath).st_size
try:
if not asset.verifySize():
return False
except BaseException as e:
raise RuntimeError('Failed size comparisement for '
+ asset.filename) from e
freespace = statvfs(destination).f_bavail\
* statvfs(destination).f_bsize
if freespace < totalsize:
return False
'''Sort the assets by size before calculating hashes, for
performance.'''
self.assets.sort(key=lambda x: try: return x.size except
AttributeError: return 0)
newdirectory = destination + '/' + basename(self.directory)
mkdir(newdirectory)
for asset in self.assets:
try:
if not asset.copyAndVerify(newdirectory):
return False
except BaseException as e:
raise RuntimeError('Failed hash calculation for ' +
asset.filename) from e
self.directory = newdirectory
return True
if __name__ == '__main__':
if len(sys.argv) == 2:
dcp = DCP(sys.argv[1])
else:
dcp = DCP('./')
try:
print('Name:', dcp.name)
except:
pass
if dcp.signed:
print('DCP is signed')
else:
print('DCP is unsigned')
print('Duration:', dcp.duration)
if dcp.verify():
print('Verification succeeded.')
else:
print('Verification failed.')
exit(0)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment