···416416 """Save backfill progress to database"""
417417 try:
418418 async with self.db_pool.acquire() as conn:
419419+ # Convert timezone-aware datetime to naive for PostgreSQL compatibility
420420+ last_update_naive = self.progress.last_update_time.replace(tzinfo=None) if self.progress.last_update_time.tzinfo else self.progress.last_update_time
421421+419422 # Upsert progress to firehose_cursor table
420423 await conn.execute("""
421424 INSERT INTO firehose_cursor (service, cursor, last_event_time, updated_at)
···428431 """,
429432 "backfill",
430433 str(self.progress.current_cursor) if self.progress.current_cursor else None,
431431- self.progress.last_update_time
434434+ last_update_naive
432435 )
433436 except Exception as e:
434437 logger.error(f"[BACKFILL] Error saving progress: {e}")
+55-23
python-firehose/redis_consumer_worker.py
···414414 # Extract avatar/banner CIDs
415415 avatar = record.get('avatar')
416416 if avatar and isinstance(avatar, dict):
417417- avatar_url = avatar.get('ref', {}).get('$link') if isinstance(avatar.get('ref'), dict) else avatar.get('ref')
417417+ ref = avatar.get('ref')
418418+ if isinstance(ref, dict):
419419+ avatar_url = ref.get('$link')
420420+ else:
421421+ avatar_url = ref
422422+ # Convert CID objects to strings
423423+ if avatar_url and not isinstance(avatar_url, str):
424424+ avatar_url = str(avatar_url)
418425419426 banner = record.get('banner')
420427 if banner and isinstance(banner, dict):
421421- banner_url = banner.get('ref', {}).get('$link') if isinstance(banner.get('ref'), dict) else banner.get('ref')
428428+ ref = banner.get('ref')
429429+ if isinstance(ref, dict):
430430+ banner_url = ref.get('$link')
431431+ else:
432432+ banner_url = ref
433433+ # Convert CID objects to strings
434434+ if banner_url and not isinstance(banner_url, str):
435435+ banner_url = str(banner_url)
422436423437 profile_json = json.dumps(record)
424438···458472459473 avatar = record.get('avatar')
460474 if avatar and isinstance(avatar, dict):
461461- avatar_url = avatar.get('ref', {}).get('$link') if isinstance(avatar.get('ref'), dict) else avatar.get('ref')
475475+ ref = avatar.get('ref')
476476+ if isinstance(ref, dict):
477477+ avatar_url = ref.get('$link')
478478+ else:
479479+ avatar_url = ref
480480+ # Convert CID objects to strings
481481+ if avatar_url and not isinstance(avatar_url, str):
482482+ avatar_url = str(avatar_url)
462483463484 created_at = self.safe_date(record.get('createdAt'))
464485···522543523544 avatar = record.get('avatar')
524545 if avatar and isinstance(avatar, dict):
525525- avatar_url = avatar.get('ref', {}).get('$link') if isinstance(avatar.get('ref'), dict) else avatar.get('ref')
546546+ ref = avatar.get('ref')
547547+ if isinstance(ref, dict):
548548+ avatar_url = ref.get('$link')
549549+ else:
550550+ avatar_url = ref
551551+ # Convert CID objects to strings
552552+ if avatar_url and not isinstance(avatar_url, str):
553553+ avatar_url = str(avatar_url)
526554527555 created_at = self.safe_date(record.get('createdAt'))
528556···800828 if not repo or not ops:
801829 return
802830803803- # Acquire database connection and process in transaction
831831+ # Acquire database connection
804832 async with self.db.acquire() as conn:
805805- async with conn.transaction():
806806- for op in ops:
807807- action = op.get('action')
808808- path = op.get('path')
809809- record = op.get('record')
810810- cid = op.get('cid')
811811-812812- if not path:
813813- continue
814814-815815- collection = path.split("/")[0]
816816- uri = f"at://{repo}/{path}"
817817-818818- try:
833833+ # Process each operation in its own transaction to prevent
834834+ # errors in one operation from aborting subsequent operations
835835+ for op in ops:
836836+ action = op.get('action')
837837+ path = op.get('path')
838838+ record = op.get('record')
839839+ cid = op.get('cid')
840840+841841+ if not path:
842842+ continue
843843+844844+ collection = path.split("/")[0]
845845+ uri = f"at://{repo}/{path}"
846846+847847+ try:
848848+ async with conn.transaction():
819849 if action in ["create", "update"] and record:
820850 record_type = record.get('$type')
821851···891921892922 elif action == "delete":
893923 await self.process_delete(conn, uri, collection)
894894-895895- except Exception as e:
896896- logger.error(f"Error processing {action} {uri}: {e}")
897897- continue
924924+925925+ except Exception as e:
926926+ # Log error and continue with next operation
927927+ # Transaction will be automatically rolled back
928928+ logger.error(f"Error processing {action} {uri}: {e}")
929929+ continue
898930899931 self.event_count += 1
900932 if self.event_count % 1000 == 0:
+23-16
python-firehose/unified_worker.py
···677677 return ref if ref != 'undefined' else None
678678 if hasattr(ref, '$link'):
679679 link = ref.get('$link') if isinstance(ref, dict) else getattr(ref, '$link', None)
680680- return link if link != 'undefined' else None
681681- if hasattr(ref, 'toString'):
680680+ if link and link != 'undefined':
681681+ return str(link) if not isinstance(link, str) else link
682682+ # Convert ref to string if it's a CID object
683683+ if ref and ref != 'undefined':
682684 return str(ref)
683685684686 if hasattr(blob, 'cid'):
685685- return blob.cid if blob.cid != 'undefined' else None
687687+ cid = blob.cid
688688+ if cid and cid != 'undefined':
689689+ return str(cid) if not isinstance(cid, str) else cid
686690687691 return None
688692···1853185718541858 # Acquire database connection
18551859 async with self.db.acquire() as conn:
18561856- # Process operations in a transaction
18571857- async with conn.transaction():
18581858- for op in commit.ops:
18591859- action = op.action
18601860- path = op.path
18611861- collection = path.split("/")[0]
18621862- uri = f"at://{repo}/{path}"
18631863-18641864- try:
18601860+ # Process each operation in its own transaction to prevent
18611861+ # errors in one operation from aborting subsequent operations
18621862+ for op in commit.ops:
18631863+ action = op.action
18641864+ path = op.path
18651865+ collection = path.split("/")[0]
18661866+ uri = f"at://{repo}/{path}"
18671867+18681868+ try:
18691869+ async with conn.transaction():
18651870 if action in ["create", "update"]:
18661871 # Extract record from CAR blocks
18671872 if not car or not hasattr(op, 'cid') or not op.cid:
···1960196519611966 elif action == "delete":
19621967 await self.process_delete(conn, uri, collection)
19631963-19641964- except Exception as e:
19651965- logger.error(f"Error processing {action} {uri}: {e}")
19661966- continue
19681968+19691969+ except Exception as e:
19701970+ # Log error and continue with next operation
19711971+ # Transaction will be automatically rolled back
19721972+ logger.error(f"Error processing {action} {uri}: {e}")
19731973+ continue
1967197419681975 self.event_count += 1
19691976 if self.event_count % 1000 == 0: