SyntaxHighlighter

2015年12月14日月曜日

PythonでEnOceanの電文を読み取る

太陽光で動作するEnOceanマグネットセンサSTM429J、温度センサSTM431Jを利用する際のメモ。
Python2.7.9+pySerialで実装。

ESP:EnOcean Serial Protocol

詳しいドキュメントはEnOceanのサイトからDLできる

EnOceanの電文は上記の図(ドキュメントから引用)のようになっている。
  1. Sync. Byte: 同期用の信号で、常に0x55で固定。
  2. header: 4byte固定。2番目のbyteにdataの、3番目にoptional dataの長さが書かれている。
  3. CRC8H: チェックサム。
  4. data: センサIDや動作内容が記載されている。
    • センサid: dataの2~5番目に書かれている4byte。
    • マグネットセンサ(STM429J): dataの6番目のbyteが0x08なら開、0x09なら閉。
    • 温度センサ(STM431J): dataの8番目のbyteを元に算出する。例えば5dなら25.4℃。
      temp = (255.0-val)/255.0*40.0
  5. optional data: 長さはheaderに書かれている。
  6. CRC8D: チェックサム。

Pythonのソースコード

CentOS環境でPython2.7.9/pySerialにより実装した例。
シリアル通信で1byteずつ読み取る。
電文の先頭はかならず0x55になるので、それを起点としてカウントする。
# coding: UTF-8
from serial import *
from sys import exit
from datetime import datetime

port = '/dev/ttyUSB0'

# シリアルポートを開く
try:
    ser = Serial(port, 57600)
    print('open serial port: %s' % port)
except:
    print('cannot open serial port: %s' % port)
    exit(1)

# 初期化
cnt,dataLen,optLen = 0,0,0
telegraph,headList,dataList,optList = [],[],[],[]
ready = True # 電文開始のフラグ管理

# データの解釈とログの記録
while True:
    s = ser.read().encode('hex') # 1byteずつ読み込み
    if s == '55' and ready: # 電文開始
        # 変数のリセット
        cnt,dataLen,optLen = 0,0,0
        telegraph,headList,dataList,optList = [],[],[],[]
        ready = False
        print '=========='
    cnt += 1
    telegraph.append(s)
    if 2 <= cnt <= 5: # header
        headList.append(s)
    if cnt == 5: # header終了, data length取得
        dataLen = int(headList[1],16)
        optLen  = int(headList[2],16)
    if 7 <= cnt <= (6+dataLen): # data
        dataList.append(s)
    if (7+dataLen) <= cnt <= (6+dataLen+optLen): # optional data
        optList.append(s)
    if cnt == (6+dataLen+optLen+1): # 電文終了
        ready = True
        dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        # ログ出力
        print dt
        print ':'.join(telegraph)
        print 'head...', ':'.join(headList)
        print 'data...', ':'.join(dataList), '(length=%d)' % dataLen
        print 'opt ...', ':'.join(optList),  '(length=%d)' % optLen
        sensorId = ':'.join(dataList[1:5]) # センサID取得
        print 'sid ...', sensorId
        # マグネットセンサ
        if sensorId == '04:00:03:df':
            if   dataList[5] == '08':
                action = 'open'
            elif dataList[5] == '09':
                action = 'close'
            print 'door...', action
        # 温度センサ
        elif sensorId == '04:00:7a:fc':
            val = int(dataList[7],16)
            temp = (255.0-val)/255.0*40.0
            print 'temp...', temp
        # 上記以外のセンサIDは無視
        else:
            continue
出力結果は以下のとおり。
今のところ変な挙動もなく、ドア開閉や気温をきちんと読み取れている。
open serial port: /dev/ttyUSB0
==========
2015-12-14 16:12:58
55:00:07:02:0a:0a:21:04:00:03:df:08:61:01:3a:b3
head... 00:07:02:0a
data... 21:04:00:03:df:08:61 (length=7)
opt ... 01:3a (length=2)
sid ... 04:00:03:df
door... open
==========
2015-12-14 16:13:05
55:00:07:02:0a:0a:21:04:00:03:df:09:66:01:3a:b3
head... 00:07:02:0a
data... 21:04:00:03:df:09:66 (length=7)
opt ... 01:3a (length=2)
sid ... 04:00:03:df
door... close
==========
2015-12-14 16:17:32
55:00:0a:02:0a:9b:22:04:00:7a:fc:00:00:5d:08:3f:01:37:90
head... 00:0a:02:0a
data... 22:04:00:7a:fc:00:00:5d:08:3f (length=10)
opt ... 01:37 (length=2)
sid ... 04:00:7a:fc
temp... 25.4117647059
当初、pySerialのreadline()で1行ずつ受信しようとしたら上手くいかなかった。
面倒でもread()で1byteずつ読み取って、ヘッダ部分からデータ長など読み取りつつ処理する必要がある。

----
2016/6/21追記:プログラムにバグがあるという指摘を受けて修正。Bool型変数readyによる、電文開始フラグに関する処理を追加した。