Skip to content

Commit

Permalink
feat: Start looking at non RLL routines and add log tp
Browse files Browse the repository at this point in the history
  • Loading branch information
hutcheb committed Jul 10, 2024
1 parent c91a609 commit b0d2f6a
Show file tree
Hide file tree
Showing 9 changed files with 3,332 additions and 35 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ ExtractAcdDatabaseRecordsToFiles('CuteLogix.ACD', 'output_directory').extract()
### Dump Comps Database Records

The Comps database contains a lot of information and can be export as a directory structure to make it easier to look at.
It will also extract the CIP class and instance and write it to the log file.

```python
from acd.api import DumpCompsRecordsToFile
Expand Down
4 changes: 3 additions & 1 deletion acd/api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from dataclasses import dataclass
from os import PathLike
from typing import List
Expand Down Expand Up @@ -118,4 +119,5 @@ class DumpCompsRecordsToFile(ExportProject):

def extract(self):
export = ExportL5x(self.filename)
DumpCompsRecords(export._cur, 0).dump(0)
with open(os.path.join(self.output_directory, export.project.target_name + ".log"), "w") as log_file:
DumpCompsRecords(export._cur, 0).dump(log_file=log_file)
17 changes: 16 additions & 1 deletion acd/generated/comps/rx_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ def _read(self):
self.cip_type = self._io.read_u2le()
self.comment_id = self._io.read_u2le()
_on = self.cip_type
if _on == 107:
if _on == 104:
self._raw_main_record = self._io.read_bytes(60)
_io__raw_main_record = KaitaiStream(BytesIO(self._raw_main_record))
self.main_record = RxGeneric.RxTag(_io__raw_main_record, self, self._root)
elif _on == 107:
self._raw_main_record = self._io.read_bytes(60)
_io__raw_main_record = KaitaiStream(BytesIO(self._raw_main_record))
self.main_record = RxGeneric.RxTag(_io__raw_main_record, self, self._root)
Expand Down Expand Up @@ -258,4 +262,15 @@ def _read(self):
self.value = self._io.read_bytes(self.len_value)


@property
def record_buffer(self):
if hasattr(self, '_m_record_buffer'):
return self._m_record_buffer

_pos = self._io.pos()
self._io.seek(14)
self._m_record_buffer = self._io.read_bytes(60)
self._io.seek(_pos)
return getattr(self, '_m_record_buffer', None)


70 changes: 50 additions & 20 deletions acd/l5x/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@
from datetime import datetime, timedelta
import xml.etree.ElementTree as ET

from acd.exceptions.CompsRecordException import UnknownRxTagVersion
from acd.generated.comps.rx_generic import RxGeneric
from acd.generated.comps.rx_tag import RxTag
from acd.generated.controller.rx_controller import RxController
from acd.generated.map_device.rx_map_device import RxMapDevice

from loguru import logger as log

@dataclass
class L5xElementBuilder:
Expand All @@ -39,7 +36,7 @@ def to_xml(self):
if isinstance(attribute_value, L5xElement):
child_list.append(attribute_value.to_xml())
elif isinstance(attribute_value, list):
if attribute == "tags" or attribute == "data_types" or attribute == "members":
if attribute == "tags" or attribute == "data_types" or attribute == "members" or attribute == "programs" or attribute == "routines":
new_child_list: List[str] = []
for element in attribute_value:
if isinstance(element, L5xElement):
Expand Down Expand Up @@ -99,6 +96,8 @@ class MapDevice(L5xElement):

@dataclass
class Routine(L5xElement):
name: str
type: str
rungs: List[str]


Expand Down Expand Up @@ -137,7 +136,7 @@ class RSLogix5000Content(L5xElement):
schema_revision: str
software_revision: str
target_name: str
target_name: str
target_type: str
contains_context: str
export_date: str
export_options: str
Expand Down Expand Up @@ -336,16 +335,16 @@ def build(self) -> Tag:
except Exception as e:
return Tag(results[0][0], results[0][0], "Base", "", "Decimal", "None", 0, [])

if r.cip_type != 0x6B:
if r.cip_type != 0x6B and r.cip_type != 0x68:
return Tag(results[0][0], results[0][0], "Base", "", "Decimal", "None", 0, [])
if r.main_record.data_type == 0xFFFFFFFF:
return Tag(results[0][0], results[0][0], "Base", "", "Decimal", "None", r.main_record.data_table_instance, [])

self._cur.execute(
"SELECT comp_name, object_id, parent_id, record FROM comps WHERE object_id=" + str(
r.main_record.data_type))
data_type_results = self._cur.fetchall()
data_type = data_type_results[0][0]
data_type = ""
else:
self._cur.execute(
"SELECT comp_name, object_id, parent_id, record FROM comps WHERE object_id=" + str(
r.main_record.data_type))
data_type_results = self._cur.fetchall()
data_type = data_type_results[0][0]

self._cur.execute(
"SELECT tag_reference, record_string FROM comments WHERE parent=" + str(
Expand All @@ -372,6 +371,24 @@ def build(self) -> Tag:
return Tag(name, name, "Base", data_type, radix, external_access, r.main_record.data_table_instance, comment_results)


def routine_type_enum(idx: int) -> str:
if idx == 0:
return "TypeLess"
if idx == 1:
return "RLL"
if idx == 2:
return "FBD"
if idx == 3:
return "SFC"
if idx == 4:
return "ST"
if idx == 5:
return "External"
if idx == 6:
return "Encrypted"
return "Typeless"


@dataclass
class RoutineBuilder(L5xElementBuilder):

Expand All @@ -381,8 +398,14 @@ def build(self) -> Routine:
self._object_id))
results = self._cur.fetchall()

try:
r = RxGeneric.from_bytes(results[0][3])
except Exception as e:
return Routine(results[0][0], results[0][0], "", [])

record = results[0][3]
name = results[0][0]
routine_type = routine_type_enum(struct.unpack_from("<H", r.record_buffer, 0x30)[0])

self._cur.execute(
"SELECT object_id, parent_id, seq_no FROM region_map WHERE parent_id=" + str(
Expand All @@ -396,7 +419,7 @@ def build(self) -> Routine:
rungs_results = self._cur.fetchall()
if len(rungs_results) > 0:
rungs.append(rungs_results[0][1])
return Routine(name, rungs)
return Routine(name, name, routine_type, rungs)


@dataclass
Expand Down Expand Up @@ -459,6 +482,8 @@ def build(self) -> Program:
self._object_id))
results = self._cur.fetchall()

r = RxGeneric.from_bytes(results[0][3])

name = results[0][0]

self._cur.execute(
Expand Down Expand Up @@ -492,6 +517,11 @@ def build(self) -> Program:
for result in results:
tags.append(TagBuilder(self._cur, result[1]).build())

self._cur.execute(
"SELECT tag_reference, record_string FROM comments WHERE parent=" + str(
(r.comment_id * 0x10000) + r.cip_type))
comment_results = self._cur.fetchall()

return Program(name, routines, tags)


Expand Down Expand Up @@ -641,14 +671,14 @@ def build(self) -> RSLogix5000Content:
now = datetime.now()
export_date = now.strftime("%a %b %d %H:%M:%S %Y")
export_options = "NoRawData L5KData DecoratedData ForceProtectedEncoding AllProjDocTrans"
return RSLogix5000Content(None, schema_revision, software_revision, target_name, target_type, contains_context, export_date, export_options)
return RSLogix5000Content(target_name, None, schema_revision, software_revision, target_name, target_type, contains_context, export_date, export_options)


@dataclass
class DumpCompsRecords(L5xElementBuilder):
base_directory: PathLike = Path("dump")

def dump(self, parent_id: int = 0):
def dump(self, parent_id: int = 0, log_file=None):
self._cur.execute(
f"SELECT comp_name, object_id, parent_id, record_type, record FROM comps WHERE parent_id={parent_id}")
results = self._cur.fetchall()
Expand All @@ -663,8 +693,8 @@ def dump(self, parent_id: int = 0):
if not os.path.exists(os.path.join(new_path)):
os.makedirs(new_path)
with open(Path(os.path.join(new_path, name + ".dat")), "wb") as file:
log_file.write(
f"Class - {struct.unpack_from('<H', result[4], 0xA)[0]} Instance {struct.unpack_from('<H', result[4], 0xC)[0]}- {str(new_path) + '/' + name}\n")
file.write(record)

DumpCompsRecords(self._cur, object_id, new_path).dump(object_id)


DumpCompsRecords(self._cur, object_id, new_path).dump(object_id, log_file)
12 changes: 5 additions & 7 deletions acd/nameless.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ class NamelessRecord:
dat_record: DatRecord

def __post_init__(self):
if self.dat_record.identifier == b'\xfa\xfa':
if self.dat_record.identifier == 64250:
identifier_offset = 8
self.identifier = struct.unpack(
"I", self.dat_record.record[identifier_offset : identifier_offset + 4]
"I", self.dat_record.record.record_buffer[identifier_offset : identifier_offset + 4]
)[0]

not_sure_offset = 12
self.not_sure = struct.unpack(
"I", self.dat_record.record[not_sure_offset: not_sure_offset + 4]
)[0]
object_identifier_offset = 0x0C
self.object_identifier = struct.unpack_from("<I", self.dat_record.record.record_buffer, object_identifier_offset)[0]

query: str = "INSERT INTO nameless VALUES (?, ?, ?)"
enty: tuple = (self.identifier, self.not_sure, self.dat_record.record)
enty: tuple = (self.object_identifier, self.identifier, self.dat_record.record.record_buffer)
self._cur.execute(query, enty)
7 changes: 6 additions & 1 deletion acd/sbregion.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __post_init__(self):
else:
return

if r.header.language_type == "Rung NT":
if r.header.language_type == "Rung NT" or r.header.language_type == "REGION NT":
text = r.record_buffer.decode("utf-16-le").rstrip('\x00')
self.text = self.replace_tag_references(text)

Expand All @@ -29,6 +29,9 @@ def __post_init__(self):
self._cur.execute(query, entry)
elif r.header.language_type == "REGION AST":
pass
elif r.header.language_type == "REGION LE UID":
uuid = struct.unpack("<I", r.record_buffer[-4:])[0]
pass
else:
pass

Expand All @@ -40,5 +43,7 @@ def replace_tag_references(self, sb_rec):
tag_id = int(tag_no, 16)
self._cur.execute("SELECT object_id, comp_name FROM comps WHERE object_id=" + str(tag_id))
results = self._cur.fetchall()
if len(results) == 0:
return sb_rec
sb_rec = sb_rec.replace(tag, results[0][1])
return sb_rec
Loading

0 comments on commit b0d2f6a

Please sign in to comment.